--- /dev/null
+CREATE TABLE process_blockers (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_blockers (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+++ /dev/null
-CREATE TABLE condition_descriptions (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- description TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES conditions(id)
-);
-CREATE TABLE condition_titles (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- title TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES conditions(id)
-);
-CREATE TABLE conditions (
- id INTEGER PRIMARY KEY,
- is_active BOOLEAN NOT NULL
-);
-CREATE TABLE days (
- id TEXT PRIMARY KEY,
- comment TEXT NOT NULL
-);
-CREATE TABLE process_conditions (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY (process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_descriptions (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- description TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE process_disables (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_efforts (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- effort REAL NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE process_enables (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_steps (
- id INTEGER PRIMARY KEY,
- owner INTEGER NOT NULL,
- step_process INTEGER NOT NULL,
- parent_step INTEGER,
- FOREIGN KEY (owner) REFERENCES processes(id),
- FOREIGN KEY (step_process) REFERENCES processes(id),
- FOREIGN KEY (parent_step) REFERENCES process_steps(step_id)
-);
-CREATE TABLE process_titles (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- title TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE processes (
- id INTEGER PRIMARY KEY,
- calendarize BOOLEAN NOT NULL DEFAULT FALSE
-);
-CREATE TABLE todo_children (
- parent INTEGER NOT NULL,
- child INTEGER NOT NULL,
- PRIMARY KEY (parent, child),
- FOREIGN KEY (parent) REFERENCES todos(id),
- FOREIGN KEY (child) REFERENCES todos(id)
-);
-CREATE TABLE todo_conditions (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todo_disables (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todo_enables (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todos (
- id INTEGER PRIMARY KEY,
- process INTEGER NOT NULL,
- is_done BOOLEAN NOT NULL,
- day TEXT NOT NULL,
- comment TEXT NOT NULL DEFAULT "",
- effort REAL,
- calendarize BOOLEAN NOT NULL DEFAULT FALSE,
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (day) REFERENCES days(id)
-);
--- /dev/null
+CREATE TABLE condition_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE condition_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE conditions (
+ id INTEGER PRIMARY KEY,
+ is_active BOOLEAN NOT NULL
+);
+CREATE TABLE days (
+ id TEXT PRIMARY KEY,
+ comment TEXT NOT NULL
+);
+CREATE TABLE process_blockers (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_conditions (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_disables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_efforts (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ effort REAL NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_enables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_steps (
+ id INTEGER PRIMARY KEY,
+ owner INTEGER NOT NULL,
+ step_process INTEGER NOT NULL,
+ parent_step INTEGER,
+ FOREIGN KEY (owner) REFERENCES processes(id),
+ FOREIGN KEY (step_process) REFERENCES processes(id),
+ FOREIGN KEY (parent_step) REFERENCES process_steps(step_id)
+);
+CREATE TABLE process_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE processes (
+ id INTEGER PRIMARY KEY,
+ calendarize BOOLEAN NOT NULL DEFAULT FALSE
+);
+CREATE TABLE todo_blockers (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_children (
+ parent INTEGER NOT NULL,
+ child INTEGER NOT NULL,
+ PRIMARY KEY (parent, child),
+ FOREIGN KEY (parent) REFERENCES todos(id),
+ FOREIGN KEY (child) REFERENCES todos(id)
+);
+CREATE TABLE todo_conditions (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_disables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_enables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todos (
+ id INTEGER PRIMARY KEY,
+ process INTEGER NOT NULL,
+ is_done BOOLEAN NOT NULL,
+ day TEXT NOT NULL,
+ comment TEXT NOT NULL DEFAULT "",
+ effort REAL,
+ calendarize BOOLEAN NOT NULL DEFAULT FALSE,
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (day) REFERENCES days(id)
+);
table_name = 'conditions'
to_save = ['is_active']
to_save_versioned = ['title', 'description']
+ to_search = ['title.newest', 'description.newest']
def __init__(self, id_: int | None, is_active: bool = False) -> None:
super().__init__(id_)
if self.id_ is None:
raise HandledException('cannot remove unsaved item')
for item in ('process', 'todo'):
- for attr in ('conditions', 'enables', 'disables'):
+ for attr in ('conditions', 'blockers', 'enables', 'disables'):
table_name = f'{item}_{attr}'
for _ in db_conn.row_where(table_name, 'condition', self.id_):
raise HandledException('cannot remove Condition in use')
class ConditionsRelations:
"""Methods for handling relations to Conditions, for Todo and Process."""
+ def __init__(self) -> None:
+ self.conditions: list[Condition] = []
+ self.blockers: list[Condition] = []
+ self.enables: list[Condition] = []
+ self.disables: list[Condition] = []
+
def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
target: str = 'conditions') -> None:
"""Set self.[target] to Conditions identified by ids."""
for id_ in ids:
target_list += [Condition.by_id(db_conn, id_)]
+ def set_blockers(self, db_conn: DatabaseConnection,
+ ids: list[int]) -> None:
+ """Set self.enables to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'blockers')
+
def set_enables(self, db_conn: DatabaseConnection,
ids: list[int]) -> None:
"""Set self.enables to Conditions identified by ids."""
--- /dev/null
+"""Various utilities for handling dates."""
+from datetime import datetime, timedelta
+from plomtask.exceptions import BadFormatException
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+def valid_date(date_str: str) -> str:
+ """Validate date against DATE_FORMAT or 'today'/'yesterday'/'tomorrow.
+
+ In any case, returns in DATE_FORMAT.
+ """
+ if date_str == 'today':
+ date_str = date_in_n_days(0)
+ elif date_str == 'yesterday':
+ date_str = date_in_n_days(-1)
+ elif date_str == 'tomorrow':
+ date_str = date_in_n_days(1)
+ try:
+ dt = datetime.strptime(date_str, DATE_FORMAT)
+ except (ValueError, TypeError) as e:
+ msg = f'Given date of wrong format: {date_str}'
+ raise BadFormatException(msg) from e
+ return dt.strftime(DATE_FORMAT)
+
+
+def date_in_n_days(n: int) -> str:
+ """Return in DATE_FORMAT date from today + n days."""
+ date = datetime.now() + timedelta(days=n)
+ return date.strftime(DATE_FORMAT)
"""Collecting Day and date-related items."""
from __future__ import annotations
from datetime import datetime, timedelta
-from plomtask.exceptions import BadFormatException
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.todos import Todo
-
-DATE_FORMAT = '%Y-%m-%d'
-MIN_RANGE_DATE = '2024-01-01'
-MAX_RANGE_DATE = '2030-12-31'
-
-
-def valid_date(date_str: str) -> str:
- """Validate date against DATE_FORMAT or 'today', return in DATE_FORMAT."""
- if date_str == 'today':
- date_str = todays_date()
- try:
- dt = datetime.strptime(date_str, DATE_FORMAT)
- except (ValueError, TypeError) as e:
- msg = f'Given date of wrong format: {date_str}'
- raise BadFormatException(msg) from e
- return dt.strftime(DATE_FORMAT)
-
-
-def todays_date() -> str:
- """Return current date in DATE_FORMAT."""
- return datetime.now().strftime(DATE_FORMAT)
+from plomtask.dating import (DATE_FORMAT, valid_date)
class Day(BaseModel[str]):
return self.date < other.date
@classmethod
- def all(cls, db_conn: DatabaseConnection,
- date_range: tuple[str, str] = ('', ''),
- fill_gaps: bool = False) -> list[Day]:
- """Return list of Days in database within (open) date_range interval.
-
- If no range values provided, defaults them to MIN_RANGE_DATE and
- MAX_RANGE_DATE. Also knows to properly interpret 'today' as value.
+ def by_date_range_filled(cls, db_conn: DatabaseConnection,
+ start: str, end: str) -> list[Day]:
+ """Return days existing and non-existing between dates start/end."""
+ ret = cls.by_date_range_with_limits(db_conn, (start, end), 'id')
+ days, start_date, end_date = ret
+ return cls.with_filled_gaps(days, start_date, end_date)
- On fill_gaps=True, will instantiate (without saving) Days of all dates
- within the date range that don't exist yet.
- """
- min_date = '2024-01-01'
- max_date = '2030-12-31'
- start_date = valid_date(date_range[0] if date_range[0] else min_date)
- end_date = valid_date(date_range[1] if date_range[1] else max_date)
- days = []
- sql = 'SELECT id FROM days WHERE id >= ? AND id <= ?'
- for row in db_conn.exec(sql, (start_date, end_date)):
- days += [cls.by_id(db_conn, row[0])]
+ @classmethod
+ def with_filled_gaps(cls, days: list[Day], start_date: str, end_date: str
+ ) -> list[Day]:
+ """In days, fill with (un-saved) Days gaps between start/end_date."""
+ if start_date > end_date:
+ return days
days.sort()
- if fill_gaps and len(days) > 1:
+ if start_date not in [d.date for d in days]:
+ days[:] = [Day(start_date)] + days
+ if end_date not in [d.date for d in days]:
+ days += [Day(end_date)]
+ if len(days) > 1:
gapless_days = []
for i, day in enumerate(days):
gapless_days += [day]
while day.next_date != days[i+1].date:
day = Day(day.next_date)
gapless_days += [day]
- days = gapless_days
+ days[:] = gapless_days
return days
@property
from sqlite3 import connect as sql_connect, Cursor, Row
from typing import Any, Self, TypeVar, Generic
from plomtask.exceptions import HandledException, NotFoundException
+from plomtask.dating import valid_date
-EXPECTED_DB_VERSION = 3
+EXPECTED_DB_VERSION = 4
MIGRATIONS_DIR = 'migrations'
FILENAME_DB_SCHEMA = f'init_{EXPECTED_DB_VERSION}.sql'
PATH_DB_SCHEMA = f'{MIGRATIONS_DIR}/{FILENAME_DB_SCHEMA}'
return list(self.exec(f'SELECT * FROM {table_name} WHERE {key} = ?',
(target,)))
+ # def column_where_pattern(self,
+ # table_name: str,
+ # column: str,
+ # pattern: str,
+ # keys: list[str]) -> list[Any]:
+ # """Return column of rows where one of keys matches pattern."""
+ # targets = tuple([f'%{pattern}%'] * len(keys))
+ # haystack = ' OR '.join([f'{k} LIKE ?' for k in keys])
+ # sql = f'SELECT {column} FROM {table_name} WHERE {haystack}'
+ # return [row[0] for row in self.exec(sql, targets)]
+
def column_where(self, table_name: str, column: str, key: str,
target: int | str) -> list[Any]:
"""Return column of table where key == target."""
to_save_relations: list[tuple[str, str, str]] = []
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
+ to_search: list[str] = []
def __init__(self, id_: BaseModelId | None) -> None:
if isinstance(id_, int) and id_ < 1:
items[item.id_] = item
return list(items.values())
+ @classmethod
+ def by_date_range_with_limits(cls: type[BaseModelInstance],
+ db_conn: DatabaseConnection,
+ date_range: tuple[str, str],
+ date_col: str = 'day'
+ ) -> tuple[list[BaseModelInstance], str,
+ str]:
+ """Return list of Days in database within (open) date_range interval.
+
+ If no range values provided, defaults them to 'yesterday' and
+ 'tomorrow'. Knows to properly interpret these and 'today' as value.
+ """
+ start_str = date_range[0] if date_range[0] else 'yesterday'
+ end_str = date_range[1] if date_range[1] else 'tomorrow'
+ start_date = valid_date(start_str)
+ end_date = valid_date(end_str)
+ items = []
+ sql = f'SELECT id FROM {cls.table_name} '
+ sql += f'WHERE {date_col} >= ? AND {date_col} <= ?'
+ for row in db_conn.exec(sql, (start_date, end_date)):
+ items += [cls.by_id(db_conn, row[0])]
+ return items, start_date, end_date
+
+ @classmethod
+ def matching(cls: type[BaseModelInstance], db_conn: DatabaseConnection,
+ pattern: str) -> list[BaseModelInstance]:
+ """Return all objects whose .to_search match pattern."""
+ items = cls.all(db_conn)
+ if pattern:
+ filtered = []
+ for item in items:
+ for attr_name in cls.to_search:
+ toks = attr_name.split('.')
+ parent = item
+ for tok in toks:
+ attr = getattr(parent, tok)
+ parent = attr
+ if pattern in attr:
+ filtered += [item]
+ break
+ return filtered
+ return items
+
def save(self, db_conn: DatabaseConnection) -> None:
"""Write self to DB and cache and ensure .id_.
from urllib.parse import urlparse, parse_qs
from os.path import split as path_split
from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
-from plomtask.days import Day, todays_date
+from plomtask.dating import date_in_n_days
+from plomtask.days import Day
from plomtask.exceptions import HandledException, BadFormatException, \
NotFoundException
from plomtask.db import DatabaseConnection, DatabaseFile
"""Show Days from ?start= to ?end=."""
start = self.params.get_str('start')
end = self.params.get_str('end')
- days = Day.all(self.conn, date_range=(start, end), fill_gaps=True)
+ if not end:
+ end = date_in_n_days(60)
+ ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
+ days, start, end = ret
+ days = Day.with_filled_gaps(days, start, end)
for day in days:
day.collect_calendarized_todos(self.conn)
- return {'start': start, 'end': end, 'days': days}
+ today = date_in_n_days(0)
+ return {'start': start, 'end': end, 'days': days, 'today': today}
def do_GET_day(self) -> dict[str, object]:
"""Show single Day of ?date=."""
- date = self.params.get_str('date', todays_date())
+ date = self.params.get_str('date', date_in_n_days(0))
todays_todos = Todo.by_date(self.conn, date)
conditions_present = []
enablers_for = {}
+ disablers_for = {}
for todo in todays_todos:
- for condition in todo.conditions:
+ for condition in todo.conditions + todo.blockers:
if condition not in conditions_present:
conditions_present += [condition]
enablers_for[condition.id_] = [p for p in
Process.all(self.conn)
if condition in p.enables]
+ disablers_for[condition.id_] = [p for p in
+ Process.all(self.conn)
+ if condition in p.disables]
seen_todos: set[int] = set()
top_nodes = [t.get_step_tree(seen_todos)
for t in todays_todos if not t.parents]
return {'day': Day.by_id(self.conn, date, create=True),
'top_nodes': top_nodes,
'enablers_for': enablers_for,
+ 'disablers_for': disablers_for,
'conditions_present': conditions_present,
'processes': Process.all(self.conn)}
'todo_candidates': Todo.by_date(self.conn, todo.date),
'condition_candidates': Condition.all(self.conn)}
+ def do_GET_todos(self) -> dict[str, object]:
+ """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
+ sort_by = self.params.get_str('sort_by')
+ start = self.params.get_str('start')
+ end = self.params.get_str('end')
+ process_id = self.params.get_int_or_none('process_id')
+ comment_pattern = self.params.get_str('comment_pattern')
+ todos = []
+ ret = Todo.by_date_range_with_limits(self.conn, (start, end))
+ todos_by_date_range, start, end = ret
+ todos = [t for t in todos_by_date_range
+ if comment_pattern in t.comment
+ and ((not process_id) or t.process.id_ == process_id)]
+ if sort_by == 'doneness':
+ todos.sort(key=lambda t: t.is_done)
+ elif sort_by == '-doneness':
+ todos.sort(key=lambda t: t.is_done, reverse=True)
+ elif sort_by == 'process':
+ todos.sort(key=lambda t: t.title_then)
+ elif sort_by == '-process':
+ todos.sort(key=lambda t: t.title_then, reverse=True)
+ elif sort_by == 'comment':
+ todos.sort(key=lambda t: t.comment)
+ elif sort_by == '-comment':
+ todos.sort(key=lambda t: t.comment, reverse=True)
+ elif sort_by == '-date':
+ todos.sort(key=lambda t: t.date, reverse=True)
+ else:
+ todos.sort(key=lambda t: t.date)
+ return {'start': start, 'end': end, 'process_id': process_id,
+ 'comment_pattern': comment_pattern, 'todos': todos,
+ 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
+
def do_GET_conditions(self) -> dict[str, object]:
"""Show all Conditions."""
- conditions = Condition.all(self.conn)
+ pattern = self.params.get_str('pattern')
+ conditions = Condition.matching(self.conn, pattern)
sort_by = self.params.get_str('sort_by')
if sort_by == 'is_active':
conditions.sort(key=lambda c: c.is_active)
conditions.sort(key=lambda c: c.title.newest, reverse=True)
else:
conditions.sort(key=lambda c: c.title.newest)
- return {'conditions': conditions, 'sort_by': sort_by}
+ return {'conditions': conditions,
+ 'sort_by': sort_by,
+ 'pattern': pattern}
def do_GET_condition(self) -> dict[str, object]:
"""Show Condition of ?id=."""
id_ = self.params.get_int_or_none('id')
- return {'condition': Condition.by_id(self.conn, id_, create=True)}
+ c = Condition.by_id(self.conn, id_, create=True)
+ ps = Process.all(self.conn)
+ return {'condition': c,
+ 'enabled_processes': [p for p in ps if c in p.conditions],
+ 'disabled_processes': [p for p in ps if c in p.blockers],
+ 'enabling_processes': [p for p in ps if c in p.enables],
+ 'disabling_processes': [p for p in ps if c in p.disables]}
def do_GET_condition_titles(self) -> dict[str, object]:
"""Show title history of Condition of ?id=."""
return {'process': process,
'steps': process.get_steps(self.conn),
'owners': process.used_as_step_by(self.conn),
+ 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
'step_candidates': Process.all(self.conn),
'condition_candidates': Condition.all(self.conn)}
def do_GET_processes(self) -> dict[str, object]:
"""Show all Processes."""
- processes = Process.all(self.conn)
+ pattern = self.params.get_str('pattern')
+ processes = Process.matching(self.conn, pattern)
sort_by = self.params.get_str('sort_by')
if sort_by == 'steps':
- processes.sort(key=lambda c: len(c.explicit_steps))
+ processes.sort(key=lambda p: len(p.explicit_steps))
elif sort_by == '-steps':
- processes.sort(key=lambda c: len(c.explicit_steps), reverse=True)
+ processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
elif sort_by == '-title':
- processes.sort(key=lambda c: c.title.newest, reverse=True)
+ processes.sort(key=lambda p: p.title.newest, reverse=True)
else:
- processes.sort(key=lambda c: c.title.newest)
- return {'processes': processes, 'sort_by': sort_by}
+ processes.sort(key=lambda p: p.title.newest)
+ return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
def do_POST(self) -> None:
"""Handle any POST request."""
day = Day.by_id(self.conn, date, create=True)
day.comment = self.form_data.get_str('day_comment')
day.save(self.conn)
- new_todos = []
- for process_id in self.form_data.get_all_int('new_todo'):
- process = Process.by_id(self.conn, process_id)
- todo = Todo(None, process, False, day.date)
- todo.save(self.conn)
- new_todos += [todo]
- adopted = True
- while adopted:
- adopted = False
- existing_todos = Todo.by_date(self.conn, date)
- for todo in new_todos:
- if todo.adopt_from(existing_todos):
- adopted = True
- todo.make_missing_children(self.conn)
- todo.save(self.conn)
+ Todo.create_with_children(self.conn, date,
+ self.form_data.get_all_int('new_todo'))
done_ids = self.form_data.get_all_int('done')
comments = self.form_data.get_all_str('comment')
efforts = self.form_data.get_all_str('effort')
effort = self.form_data.get_str('effort', ignore_strict=True)
todo.effort = float(effort) if effort else None
todo.set_conditions(self.conn, self.form_data.get_all_int('condition'))
+ todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
todo.is_done = len(self.form_data.get_all_str('done')) > 0
process.effort.set(self.form_data.get_float('effort'))
process.set_conditions(self.conn,
self.form_data.get_all_int('condition'))
+ process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
process.set_enables(self.conn, self.form_data.get_all_int('enables'))
process.set_disables(self.conn, self.form_data.get_all_int('disables'))
process.calendarize = self.form_data.get_all_str('calendarize') != []
process.save(self.conn)
- process.explicit_steps = []
steps: list[tuple[int | None, int, int | None]] = []
+ for step_id in self.form_data.get_all_int('keep_step'):
+ if step_id not in self.form_data.get_all_int('steps'):
+ raise BadFormatException('trying to keep unknown step')
for step_id in self.form_data.get_all_int('steps'):
for step_process_id in self.form_data.get_all_int(
f'new_step_to_{step_id}'):
to_save = ['calendarize']
to_save_versioned = ['title', 'description', 'effort']
to_save_relations = [('process_conditions', 'process', 'conditions'),
+ ('process_blockers', 'process', 'blockers'),
('process_enables', 'process', 'enables'),
('process_disables', 'process', 'disables')]
+ to_search = ['title.newest', 'description.newest']
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
- super().__init__(id_)
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
self.description = VersionedAttribute(self, 'process_descriptions', '')
self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
self.explicit_steps: list[ProcessStep] = []
self.calendarize = calendarize
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
process.id_):
step = ProcessStep.from_table_row(db_conn, row_)
process.explicit_steps += [step] # pylint: disable=no-member
- for name in ('conditions', 'enables', 'disables'):
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
for c_id in db_conn.column_where(table, 'condition',
from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
+from plomtask.dating import valid_date
@dataclass
to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
'calendarize']
to_save_relations = [('todo_conditions', 'todo', 'conditions'),
+ ('todo_blockers', 'todo', 'blockers'),
('todo_enables', 'todo', 'enables'),
('todo_disables', 'todo', 'disables'),
('todo_children', 'parent', 'children'),
('todo_children', 'child', 'parents')]
+ to_search = ['comment']
# pylint: disable=too-many-arguments
def __init__(self, id_: int | None,
date: str, comment: str = '',
effort: None | float = None,
calendarize: bool = False) -> None:
- super().__init__(id_)
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
if process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
self.process = process
self._is_done = is_done
- self.date = date
+ self.date = valid_date(date)
self.comment = comment
self.effort = effort
self.children: list[Todo] = []
self.parents: list[Todo] = []
self.calendarize = calendarize
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
if not self.id_:
self.calendarize = self.process.calendarize
self.conditions = self.process.conditions[:]
+ self.blockers = self.process.blockers[:]
self.enables = self.process.enables[:]
self.disables = self.process.disables[:]
+ @classmethod
+ def by_date_range(cls, db_conn: DatabaseConnection,
+ date_range: tuple[str, str] = ('', '')) -> list[Todo]:
+ """Collect Todos of Days within date_range."""
+ todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
+ return todos
+
+ @classmethod
+ def create_with_children(cls, db_conn: DatabaseConnection, date: str,
+ process_ids: list[int]) -> list[Todo]:
+ """Create Todos of process_ids for date, ensure children."""
+ new_todos = []
+ for process_id in process_ids:
+ process = Process.by_id(db_conn, process_id)
+ todo = Todo(None, process, False, date)
+ todo.save(db_conn)
+ new_todos += [todo]
+ nothing_to_adopt = False
+ while not nothing_to_adopt:
+ nothing_to_adopt = True
+ existing_todos = Todo.by_date(db_conn, date)
+ for todo in new_todos:
+ if todo.adopt_from(existing_todos):
+ nothing_to_adopt = False
+ todo.make_missing_children(db_conn)
+ todo.save(db_conn)
+ return new_todos
+
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Todo:
'child', todo.id_):
# pylint: disable=no-member
todo.parents += [cls.by_id(db_conn, t_id)]
- for name in ('conditions', 'enables', 'disables'):
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'todo_{name}'
assert isinstance(todo.id_, int)
for cond_id in db_conn.column_where(table, 'condition',
target += [Condition.by_id(db_conn, cond_id)]
return todo
+ @classmethod
+ def by_process_id(cls, db_conn: DatabaseConnection,
+ process_id: int | None) -> list[Todo]:
+ """Collect all Todos of Process of process_id."""
+ return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
+
@classmethod
def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
"""Collect all Todos for Day of date."""
- todos = []
- for id_ in db_conn.column_where('todos', 'id', 'day', date):
- todos += [cls.by_id(db_conn, id_)]
- return todos
+ return cls.by_date_range(db_conn, (date, date))
@property
def is_doable(self) -> bool:
for condition in self.conditions:
if not condition.is_active:
return False
+ for condition in self.blockers:
+ if condition.is_active:
+ return False
+ return True
+
+ @property
+ def is_deletable(self) -> bool:
+ """Decide whether self be deletable (not if preserve-worthy values)."""
+ if self.comment:
+ return False
+ if self.effort and self.effort >= 0:
+ return False
return True
@property
"""Shortcut to .process.title."""
return self.process.title
+ @property
+ def title_then(self) -> str:
+ """Shortcut to .process.title.at(self.date)"""
+ title_then = self.process.title.at(self.date)
+ assert isinstance(title_then, str)
+ return title_then
+
+ @property
+ def effort_then(self) -> float:
+ """Shortcut to .process.effort.at(self.date)"""
+ effort_then = self.process.effort.at(self.date)
+ assert isinstance(effort_then, float)
+ return effort_then
+
def adopt_from(self, todos: list[Todo]) -> bool:
"""As far as possible, fill unsatisfied dependencies from todos."""
adopted = False
def make_missing_children(self, db_conn: DatabaseConnection) -> None:
"""Fill unsatisfied dependencies with new Todos."""
- for process_id in self.unsatisfied_dependencies:
- process = Process.by_id(db_conn, process_id)
- todo = self.__class__(None, process, False, self.date)
- todo.save(db_conn)
+ new_todos = self.__class__.create_with_children(
+ db_conn, self.date, self.unsatisfied_dependencies)
+ for todo in new_todos:
self.add_child(todo)
def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
self.children.remove(child)
child.parents.remove(self)
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """On save calls, also check if auto-deletion by effort < 0."""
+ if self.effort and self.effort < 0 and self.is_deletable:
+ self.remove(db_conn)
+ return
+ super().save(db_conn)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, including relations."""
+ if not self.is_deletable:
+ raise HandledException('Cannot remove non-deletable Todo.')
children_to_remove = self.children[:]
parents_to_remove = self.parents[:]
for child in children_to_remove:
def at(self, queried_time: str) -> str | float:
"""Retrieve value of timestamp nearest queried_time from the past."""
+ if len(queried_time) == 10:
+ queried_time += ' 23:59:59.999'
sorted_timestamps = sorted(self.history.keys())
if 0 == len(sorted_timestamps):
return self.default
Jinja2==3.1.3
+unittest-parallel==1.6.1
echo "Running pylint on ${dir}/ …"
python3 -m pylint ${dir}/*.py
done
-echo "Running unittest on tests/."
-python3 -m unittest tests/*.py
+echo "Running unittest-parallel on tests/."
+unittest-parallel -t . -s tests/ -p '*.py'
+set +e
+rm test_db:*.*
+set -e
exit 0
{% endblock %}
</style>
<body>
-<a href="processes">processes</a>
-<a href="conditions">conditions</a>
<a href="day">today</a>
<a href="calendar">calendar</a>
+<a href="conditions">conditions</a>
+<a href="processes">processes</a>
+<a href="todos">todos</a>
<hr>
{% block content %}
{% endblock %}
-{% macro simple_checkbox_table(title, items, type_name, list_name, add_string="add") %}
+{% macro simple_checkbox_table(title, items, type_name, list_name, add_string="add", historical=false) %}
<table>
{% for item in items %}
<tr>
<input type="checkbox" name="{{title}}" value="{{item.id_}}" checked />
</td>
<td>
-<a href="{{type_name}}?id={{item.id_}}">{{item.title.newest|e}}</a>
+<a href="{{type_name}}?id={{item.id_}}">{% if historical %}{{item.title_then}}{% else %}{{item.title.newest|e}}{% endif %}</a>
</td>
</tr>
{% endfor %}
td.day_name {
padding-right: 0.5em;
}
+td.today {
+ font-weight: bold;
+}
{% endblock %}
<tr>
<td class="day_name">{{day.weekday|truncate(2,True,'',0)}}</td>
-<td><a href="day?date={{day.date}}">{{day.date}}</a></td>
+<td{% if day.date == today %} class="today"{% endif %}><a href="day?date={{day.date}}">{{day.date}}</a></td>
<td>{{day.comment|e}}</td>
</tr>
{% for todo in day.calendarized_todos %}
<tr>
<td>[{% if todo.is_done %}X{% else %} {% endif %}]</td>
-<td><a href="todo?id={{todo.id_}}">{{todo.title.newest|e}}</td>
+<td><a href="todo?id={{todo.id_}}">{{todo.title_then|e}}</td>
<td>{{todo.comment|e}}</td>
</tr>
{% endfor %}
<td><textarea name="description">{{condition.description.newest|e}}</textarea>{% if condition.id_ %} [<a href="condition_descriptions?id={{condition.id_}}">history</a>]{% endif %}</td>
<tr/>
+<tr>
+<th>enables</th>
+<td>
+{% for process in enabled_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
+<tr>
+<th>disables</th>
+<td>
+{% for process in disabled_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
+<tr>
+<th>enabled by</th>
+<td>
+{% for process in enabling_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
+<tr>
+<th>disabled by</th>
+<td>
+{% for process in disabling_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
</table>
{{ macros.edit_buttons() }}
{% endblock %}
{% block content %}
<h3>conditions</h3>
+<form action="conditions" method="GET">
+<input type="submit" value="filter" />
+<input name="pattern" value="{{pattern}}" />
+</form>
+
<table>
<tr>
<th><a href="?sort_by={% if sort_by == "is_active" %}-{% endif %}is_active">active</a></th>
{% macro show_node_undone(node, indent) %}
{% if not node.todo.is_done %}
<tr>
+{% if not node.seen %}
<input type="hidden" name="todo_id" value="{{node.todo.id_}}" />
+{% endif %}
{% for condition in conditions_present %}
-<td class="cond_line_{{loop.index0 % 3}} {% if not condition.is_active %}min_width{% endif %}">{% if condition in node.todo.conditions %}{% if not condition.is_active %}O{% endif %}{% endif %}</td>
+<td class="cond_line_{{loop.index0 % 3}} {% if not condition.is_active %}min_width{% endif %}">{% if condition in node.todo.conditions and not condition.is_active %}O{% elif condition in node.todo.blockers and condition.is_active %}!{% endif %}</td>
{% endfor %}
<td class="todo_line">-></td>
-<td class="todo_line"><input name="done" type="checkbox" value="{{node.todo.id_}}" {% if node.todo.is_done %}checked disabled{% endif %} {% if not node.todo.is_doable %}disabled{% endif %}/></td>
-<td class="todo_line"><input name="effort" type="number" step=0.1 size=5 placeholder={{node.todo.process.effort.newest }} value={{node.todo.effort}} /></td>
+{% if node.seen %}
+<td class="todo_line"></td>
+<td class="todo_line">{% if node.todo.effort %}{{ node.todo.effort }}{% endif %}</td>
+{% else %}
+<td class="todo_line"><input name="done" type="checkbox" value="{{node.todo.id_}}" {% if not node.todo.is_doable %}disabled{% endif %}/></td>
+<td class="todo_line"><input name="effort" type="number" step=0.1 size=5 placeholder={{node.todo.effort_then}} value={{node.todo.effort}} /></td>
+{% endif %}
<td class="todo_line">
{% for i in range(indent) %} {% endfor %} +
-{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.process.title.newest|e}}</a>{% if node.seen %}){% endif %}
+{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.title_then|e}}</a>{% if node.seen %}){% endif %}
</td>
<td class="todo_line">-></td>
<td class="cond_line_{{(conditions_present|length - loop.index) % 3}} {% if condition in node.todo.enables or condition in node.todo.disables %}min_width{% endif %}">{% if condition in node.todo.enables %}+{% elif condition in node.todo.disables %}!{% endif %}</td>
{% endfor %}
-<td><input name="comment" value="{{node.todo.comment|e}}" /></td>
+<td>
+{% if node.seen %}
+{{node.todo.comment|e}}
+{% else %}
+<input name="comment" value="{{node.todo.comment|e}}" />
+{% endif %}
+</td>
</tr>
{% endif %}
<tr>
{% if path|length > 0 and not path[-1].todo.is_done %}
<td>
-({% for path_node in path %}<a href="todo?id={{path_node.todo.id_}}">{{path_node.todo.process.title.newest|e}}</a> <- {% endfor %})
+({% for path_node in path %}<a href="todo?id={{path_node.todo.id_}}">{{path_node.todo.title_then|e}}</a> <- {% endfor %})
</td>
</tr>
<td>
{% for i in range(indent) %} {% endfor %} +
{% endif %}
-{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.process.title.newest|e}}</a> {% if node.todo.comment|length > 0 %}[{{node.todo.comment|e}}]{% endif %}{% if node.seen %}){% endif %}
+{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.title_then|e}}</a> {% if node.todo.comment|length > 0 %}[{{node.todo.comment|e}}]{% endif %}{% if node.seen %}){% endif %}
</td>
</tr>
<tr>
<th colspan={{ conditions_present|length}}>c</th>
-<th colspan=4>states</th>
+<th colspan=5>states</th>
<th colspan={{ conditions_present|length}}>t</th>
<th>add enabler</th>
+<th>add disabler</th>
</tr>
{% for condition in conditions_present %}
{% endfor %}
<td class="cond_line_{{loop.index0 % 3}}">[{% if condition.is_active %}X{% else %} {% endif %}]</td>
-<td colspan=3 class="cond_line_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a></td>
+<td colspan=4 class="cond_line_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
{% for _ in conditions_present %}
{% if outer_loop.index0 + loop.index0 < conditions_present|length %}
<td><input name="new_todo" list="{{list_name}}" autocomplete="off" /></td>
{{ macros.datalist_of_titles(list_name, enablers_for[condition.id_]) }}
</td>
+{% set list_name = "todos_against_%s"|format(condition.id_) %}
+<td><input name="new_todo" list="{{list_name}}" autocomplete="off" /></td>
+{{ macros.datalist_of_titles(list_name, disablers_for[condition.id_]) }}
+</td>
</tr>
{% endfor %}
{% for condition in conditions_present %}
<td class="cond_line_{{loop.index0 % 3}}"></td>
{% endfor %}
-<th colspan={{ 4 }}>doables</th>
+<th colspan=5>doables</th>
{% for condition in conditions_present %}
<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}"></td>
{% endfor %}
</td>
<td>
{% if step_node.is_explicit %}
-add: <input name="new_step_to_{{step_id}}" list="candidates" autocomplete="off" />
+add sub-step: <input name="new_step_to_{{step_id}}" list="candidates" autocomplete="off" />
{% endif %}
</td>
</tr>
<td>{{ macros.simple_checkbox_table("condition", process.conditions, "condition", "condition_candidates") }}</td>
</tr>
+<tr>
+<th>blockers</th>
+<td>{{ macros.simple_checkbox_table("blocker", process.blockers, "condition", "condition_candidates") }}</td>
+</tr>
+
<tr>
<th>enables</th>
<td>{{ macros.simple_checkbox_table("enables", process.enables, "condition", "condition_candidates") }}</td>
</td>
<tr>
+<tr>
+<th>todos</th>
+<td>
+<a href="todos?process_id={{process.id_}}">{{n_todos}}</a><br />
+</td>
+<tr>
+
</table>
{{ macros.edit_buttons() }}
</form>
{% block content %}
<h3>processes</h3>
+<form action="processes" method="GET">
+<input type="submit" value="filter" />
+<input name="pattern" value="{{pattern}}" />
+</form>
+
<table>
<tr>
<th><a href="?sort_by={% if sort_by == "steps" %}-{% endif %}steps">steps</a></th>
{% block content %}
-<h3>Todo: {{todo.process.title.newest|e}}</h3>
+<h3>Todo: {{todo.title_then|e}}</h3>
<form action="todo?id={{todo.id_}}" method="POST">
<table>
<tr>
<th>effort</th>
-<td><input type="number" name="effort" step=0.1 size=5 placeholder={{todo.process.effort.newest }} value={{ todo.effort }} /><br /></td>
+<td><input type="number" name="effort" step=0.1 size=5 placeholder={{todo.effort_then}} value={{todo.effort}} /><br /></td>
</tr>
<tr>
<tr>
<th>conditions</th>
-<td>{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates", historical=true) }}</td>
+</tr>
+
+<tr>
+<th>blockers</th>
+<td>{{ macros.simple_checkbox_table("blocker", todo.blockers, "condition", "condition_candidates", historical=true) }}</td>
</tr>
<tr>
<th>enables</th>
-<td>{{ macros.simple_checkbox_table("enables", todo.enables, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("enables", todo.enables, "condition", "condition_candidates", historical=true) }}</td>
</tr>
<tr>
<th>disables</th>
-<td>{{ macros.simple_checkbox_table("disables", todo.disables, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("disables", todo.disables, "condition", "condition_candidates", historical=true) }}</td>
</tr>
<tr>
<th>parents</th>
<td>
{% for parent in todo.parents %}
-<a href="todo?id={{parent.id_}}">{{parent.process.title.newest|e}}</a><br />
+<a href="todo?id={{parent.id_}}">{{parent.title_then|e}}</a><br />
{% endfor %}
</td>
</tr>
<tr>
<th>children</th>
-<td>{{ macros.simple_checkbox_table("adopt", todo.children, "adopt", "todo_candidates", "adopt") }}</td>
+<td>{{ macros.simple_checkbox_table("adopt", todo.children, "adopt", "todo_candidates", "adopt", true) }}</td>
</tr>
</table>
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+<h3>todos</h3>
+
+<form action="todos" method="GET">
+<input type="submit" value="filter" />
+process <input name="process_id" value="{{process_id or ''}}" list="processes" />
+from <input name="start" value="{{start}}" />
+to <input name="end" value="{{end}}" />
+in comment <input name="comment_pattern" value="{{comment_pattern}}" />
+<input type="submit" value="OK" />
+</form>
+
+<table>
+<tr>
+<th><a href="?sort_by={% if sort_by == "doneness" %}-{% endif %}doneness">done</a></th>
+<th><a href="?sort_by={% if sort_by == "date" %}-{% endif %}date">date</a></th>
+<th><a href="?sort_by={% if sort_by == "process" %}-{% endif %}process">process</a></th>
+<th><a href="?sort_by={% if sort_by == "comment" %}-{% endif %}comment">comment</a></th>
+</tr>
+{% for todo in todos %}
+<tr>
+<td>[{% if todo.is_done %}x{% else %} {% endif %}]</td>
+<td><a href="{{todo.date}}">{{todo.date}}</a></td>
+<td><a href="process?id={{todo.process.id_}}">{{todo.title_then}}</a></td>
+<td><a href="{{todo.comment}}">{{todo.comment}}</a></td>
+</tr>
+{% endfor %}
+</table>
+{{ macros.datalist_of_titles("processes", all_processes) }}
+{% endblock %}
+
from unittest import TestCase
from datetime import datetime
from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.days import Day, todays_date
+from plomtask.dating import date_in_n_days
+from plomtask.days import Day
from plomtask.exceptions import BadFormatException
"""Test .by_id()."""
self.check_by_id()
- def test_Day_all(self) -> None:
- """Test Day.all(), especially in regards to date range filtering."""
+ def test_Day_by_date_range_filled(self) -> None:
+ """Test Day.by_date_range_filled."""
date1, date2, date3 = self.default_ids
day1, day2, day3 = self.check_all()
- self.assertEqual(Day.all(self.db_conn, ('', '')),
- [day1, day2, day3])
# check date range is a closed interval
- self.assertEqual(Day.all(self.db_conn, (date1, date3)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
[day1, day2, day3])
# check first date range value excludes what's earlier
- self.assertEqual(Day.all(self.db_conn, (date2, date3)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date2, date3),
[day2, day3])
- self.assertEqual(Day.all(self.db_conn, (date3, '')),
- [day3])
# check second date range value excludes what's later
- self.assertEqual(Day.all(self.db_conn, ('', date2)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date2),
[day1, day2])
# check swapped (impossible) date range returns emptiness
- self.assertEqual(Day.all(self.db_conn, (date3, date1)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date3, date1),
[])
# check fill_gaps= instantiates unsaved dates within date range
# (but does not store them)
- day4 = Day('2024-01-04')
day5 = Day('2024-01-05')
day6 = Day('2024-01-06')
day6.save(self.db_conn)
- self.assertEqual(Day.all(self.db_conn, (date2, '2024-01-07'),
- fill_gaps=True),
- [day2, day3, day4, day5, day6])
+ day7 = Day('2024-01-07')
+ self.assertEqual(Day.by_date_range_filled(self.db_conn,
+ day5.date, day7.date),
+ [day5, day6, day7])
self.check_storage([day1, day2, day3, day6])
# check 'today' is interpreted as today's date
- today = Day(todays_date())
+ today = Day(date_in_n_days(0))
today.save(self.db_conn)
- self.assertEqual(Day.all(self.db_conn, ('today', 'today')), [today])
+ self.assertEqual(Day.by_date_range_filled(self.db_conn,
+ 'today', 'today'),
+ [today])
def test_Day_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
self.check_post(form_data, '/process?id=6', 404)
self.check_post(form_data, '/process?id=5', 302, '/processes')
+ def test_do_POST_process_steps(self) -> None:
+ """Test behavior of ProcessStep posting."""
+ form_data_1 = self.post_process(1)
+ self.post_process(2)
+ self.post_process(3)
+ form_data_1['new_top_step'] = [2]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 2)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ form_data_1['new_top_step'] = []
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(retrieved_process.explicit_steps, [])
+ with self.assertRaises(NotFoundException):
+ ProcessStep.by_id(self.db_conn, retrieved_step.id_)
+ form_data_1['new_top_step'] = [3]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 3)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ form_data_1['new_top_step'] = []
+ form_data_1['steps'] = [retrieved_step.id_]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(retrieved_process.explicit_steps, [])
+ form_data_1['steps'] = []
+ form_data_1['keep_step'] = [retrieved_step.id_]
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ form_data_1['steps'] = [retrieved_step.id_]
+ form_data_1['keep_step'] = [retrieved_step.id_]
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 2)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ form_data_1['new_top_step'] = ['foo']
+ form_data_1['steps'] = []
+ form_data_1['keep_step'] = []
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+
def test_do_GET(self) -> None:
"""Test /process and /processes response codes."""
- self.post_process()
self.check_get_defaults('/process')
self.check_get('/processes', 200)
t2.save(self.db_conn)
self.assertEqual(Todo.by_date(self.db_conn, self.date1), [t1, t2])
self.assertEqual(Todo.by_date(self.db_conn, self.date2), [])
- self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
+ with self.assertRaises(BadFormatException):
+ self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
def test_Todo_on_conditions(self) -> None:
"""Test effect of Todos on Conditions."""
self.assertEqual(todo_3.children, [])
self.assertEqual(todo_4.process, proc3)
self.assertEqual(todo_4.parents, [todo_2])
- # test .make_missing_children doesn't further than top-level
- self.assertEqual(todo_4.children, [])
# test .make_missing_children lower down the tree
todo_4.make_missing_children(self.db_conn)
todo_5 = Todo.by_id(self.db_conn, 5)
Todo.by_id(self.db_conn, todo_1.id_)
self.assertEqual(todo_0.children, [])
self.assertEqual(todo_2.parents, [])
+ todo_2.comment = 'foo'
+ with self.assertRaises(HandledException):
+ todo_2.remove(self.db_conn)
+ todo_2.comment = ''
+ todo_2.effort = 5
+ with self.assertRaises(HandledException):
+ todo_2.remove(self.db_conn)
+
+ def test_Todo_autoremoval(self) -> None:
+ """"Test automatic removal for Todo.effort < 0."""
+ todo_1 = Todo(None, self.proc, False, self.date1)
+ todo_1.save(self.db_conn)
+ todo_1.comment = 'foo'
+ todo_1.effort = -0.1
+ todo_1.save(self.db_conn)
+ Todo.by_id(self.db_conn, todo_1.id_)
+ todo_1.comment = ''
+ todo_1.save(self.db_conn)
+ with self.assertRaises(NotFoundException):
+ Todo.by_id(self.db_conn, todo_1.id_)
class TestsWithServer(TestCaseWithServer):
def test_do_POST_day_todo_multiple_inner_adoption(self) -> None:
"""Test multiple Todos can be posted to Day view w. inner adoption."""
+
+ def key_order_func(t: Todo) -> int:
+ assert isinstance(t.process.id_, int)
+ return t.process.id_
+
+ def check_adoption(date: str, new_todo: list[int]) -> None:
+ form_data = {'day_comment': '', 'new_todo': new_todo}
+ self.check_post(form_data, f'/day?date={date}', 302)
+ day_todos = Todo.by_date(self.db_conn, date)
+ day_todos.sort(key=key_order_func)
+ todo1 = day_todos[0]
+ todo2 = day_todos[1]
+ self.assertEqual(todo1.children, [])
+ self.assertEqual(todo1.parents, [todo2])
+ self.assertEqual(todo2.children, [todo1])
+ self.assertEqual(todo2.parents, [])
+
+ def check_nesting_adoption(process_id: int, date: str,
+ new_top_steps: list[int]) -> None:
+ result_reversed = new_top_steps[1] < new_top_steps[0]
+ form_data = self.post_process()
+ form_data = self.post_process(process_id,
+ form_data |
+ {'new_top_step': new_top_steps})
+ form_data = {'day_comment': '', 'new_todo': [process_id]}
+ self.check_post(form_data, f'/day?date={date}', 302)
+ day_todos = Todo.by_date(self.db_conn, date)
+ day_todos.sort(key=key_order_func, reverse=True)
+ self.assertEqual(len(day_todos), 3)
+ todo1 = day_todos[0] # process of process_id
+ todo2 = day_todos[1] # process 2
+ todo3 = day_todos[2] # process 1
+ if result_reversed:
+ self.assertEqual(todo1.children, [todo2, todo3])
+ else:
+ self.assertEqual(todo1.children, [todo3, todo2])
+ self.assertEqual(todo1.parents, [])
+ self.assertEqual(todo2.children, [todo3])
+ self.assertEqual(todo2.parents, [todo1])
+ self.assertEqual(todo3.children, [])
+ self.assertEqual(todo3.parents, [todo2, todo1])
+
form_data = self.post_process()
form_data = self.post_process(2, form_data | {'new_top_step': 1})
- form_data = {'day_comment': '', 'new_todo': [1, 2]}
- self.check_post(form_data, '/day?date=2024-01-01', 302)
- todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [todo2])
- self.assertEqual(todo2.children, [todo1])
- self.assertEqual(todo2.parents, [])
- # check process ID order does not affect end result
- form_data = {'day_comment': '', 'new_todo': [2, 1]}
- self.check_post(form_data, '/day?date=2024-01-02', 302)
- todo1 = Todo.by_date(self.db_conn, '2024-01-02')[1]
- todo2 = Todo.by_date(self.db_conn, '2024-01-02')[0]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [todo2])
- self.assertEqual(todo2.children, [todo1])
- self.assertEqual(todo2.parents, [])
+ check_adoption('2024-01-01', [1, 2])
+ check_adoption('2024-01-02', [2, 1])
+ check_nesting_adoption(3, '2024-01-03', [1, 2])
+ check_nesting_adoption(4, '2024-01-04', [2, 1])
def test_do_POST_day_todo_doneness(self) -> None:
"""Test Todo doneness can be posted to Day view."""
"""POST basic Process."""
if not form_data:
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, '/process?id=', 302, f'/process?id={id_}')
+ self.check_post(form_data, f'/process?id={id_}', 302,
+ f'/process?id={id_}')
return form_data