--- /dev/null
+CREATE TABLE condition_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE condition_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE conditions (
+ id INTEGER PRIMARY KEY,
+ is_active BOOLEAN NOT NULL
+);
+CREATE TABLE days (
+ id TEXT PRIMARY KEY,
+ comment TEXT NOT NULL
+);
+CREATE TABLE process_conditions (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_disables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_efforts (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ effort REAL NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_enables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_steps (
+ id INTEGER PRIMARY KEY,
+ owner INTEGER NOT NULL,
+ step_process INTEGER NOT NULL,
+ parent_step INTEGER,
+ FOREIGN KEY (owner) REFERENCES processes(id),
+ FOREIGN KEY (step_process) REFERENCES processes(id),
+ FOREIGN KEY (parent_step) REFERENCES process_steps(step_id)
+);
+CREATE TABLE process_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE processes (
+ id INTEGER PRIMARY KEY
+);
+CREATE TABLE todo_children (
+ parent INTEGER NOT NULL,
+ child INTEGER NOT NULL,
+ PRIMARY KEY (parent, child),
+ FOREIGN KEY (parent) REFERENCES todos(id),
+ FOREIGN KEY (child) REFERENCES todos(id)
+);
+CREATE TABLE todo_conditions (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_disables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_enables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todos (
+ id INTEGER PRIMARY KEY,
+ process INTEGER NOT NULL,
+ is_done BOOLEAN NOT NULL,
+ day TEXT NOT NULL,
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (day) REFERENCES days(id)
+);
--- /dev/null
+ALTER TABLE todos ADD COLUMN comment TEXT NOT NULL DEFAULT "";
--- /dev/null
+ALTER TABLE todos ADD COLUMN effort REAL;
--- /dev/null
+ALTER TABLE todos ADD COLUMN calendarize BOOLEAN NOT NULL DEFAULT FALSE;
+ALTER TABLE processes ADD COLUMN calendarize BOOLEAN NOT NULL DEFAULT FALSE;
--- /dev/null
+CREATE TABLE process_blockers (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_blockers (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
--- /dev/null
+CREATE TABLE condition_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE condition_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES conditions(id)
+);
+CREATE TABLE conditions (
+ id INTEGER PRIMARY KEY,
+ is_active BOOLEAN NOT NULL
+);
+CREATE TABLE days (
+ id TEXT PRIMARY KEY,
+ comment TEXT NOT NULL
+);
+CREATE TABLE process_blockers (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_conditions (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_descriptions (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_disables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_efforts (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ effort REAL NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE process_enables (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE process_steps (
+ id INTEGER PRIMARY KEY,
+ owner INTEGER NOT NULL,
+ step_process INTEGER NOT NULL,
+ parent_step INTEGER,
+ FOREIGN KEY (owner) REFERENCES processes(id),
+ FOREIGN KEY (step_process) REFERENCES processes(id),
+ FOREIGN KEY (parent_step) REFERENCES process_steps(step_id)
+);
+CREATE TABLE process_titles (
+ parent INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent, timestamp),
+ FOREIGN KEY (parent) REFERENCES processes(id)
+);
+CREATE TABLE processes (
+ id INTEGER PRIMARY KEY,
+ calendarize BOOLEAN NOT NULL DEFAULT FALSE
+);
+CREATE TABLE todo_blockers (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_children (
+ parent INTEGER NOT NULL,
+ child INTEGER NOT NULL,
+ PRIMARY KEY (parent, child),
+ FOREIGN KEY (parent) REFERENCES todos(id),
+ FOREIGN KEY (child) REFERENCES todos(id)
+);
+CREATE TABLE todo_conditions (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_disables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_enables (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todos (
+ id INTEGER PRIMARY KEY,
+ process INTEGER NOT NULL,
+ is_done BOOLEAN NOT NULL,
+ day TEXT NOT NULL,
+ comment TEXT NOT NULL DEFAULT "",
+ effort REAL,
+ calendarize BOOLEAN NOT NULL DEFAULT FALSE,
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (day) REFERENCES days(id)
+);
table_name = 'conditions'
to_save = ['is_active']
to_save_versioned = ['title', 'description']
+ to_search = ['title.newest', 'description.newest']
def __init__(self, id_: int | None, is_active: bool = False) -> None:
super().__init__(id_)
if self.id_ is None:
raise HandledException('cannot remove unsaved item')
for item in ('process', 'todo'):
- for attr in ('conditions', 'enables', 'disables'):
+ for attr in ('conditions', 'blockers', 'enables', 'disables'):
table_name = f'{item}_{attr}'
for _ in db_conn.row_where(table_name, 'condition', self.id_):
raise HandledException('cannot remove Condition in use')
class ConditionsRelations:
"""Methods for handling relations to Conditions, for Todo and Process."""
+ def __init__(self) -> None:
+ self.conditions: list[Condition] = []
+ self.blockers: list[Condition] = []
+ self.enables: list[Condition] = []
+ self.disables: list[Condition] = []
+
def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
target: str = 'conditions') -> None:
"""Set self.[target] to Conditions identified by ids."""
for id_ in ids:
target_list += [Condition.by_id(db_conn, id_)]
+ def set_blockers(self, db_conn: DatabaseConnection,
+ ids: list[int]) -> None:
+ """Set self.enables to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'blockers')
+
def set_enables(self, db_conn: DatabaseConnection,
ids: list[int]) -> None:
"""Set self.enables to Conditions identified by ids."""
--- /dev/null
+"""Various utilities for handling dates."""
+from datetime import datetime, timedelta
+from plomtask.exceptions import BadFormatException
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+def valid_date(date_str: str) -> str:
+ """Validate date against DATE_FORMAT or 'today'/'yesterday'/'tomorrow.
+
+ In any case, returns in DATE_FORMAT.
+ """
+ if date_str == 'today':
+ date_str = date_in_n_days(0)
+ elif date_str == 'yesterday':
+ date_str = date_in_n_days(-1)
+ elif date_str == 'tomorrow':
+ date_str = date_in_n_days(1)
+ try:
+ dt = datetime.strptime(date_str, DATE_FORMAT)
+ except (ValueError, TypeError) as e:
+ msg = f'Given date of wrong format: {date_str}'
+ raise BadFormatException(msg) from e
+ return dt.strftime(DATE_FORMAT)
+
+
+def date_in_n_days(n: int) -> str:
+ """Return in DATE_FORMAT date from today + n days."""
+ date = datetime.now() + timedelta(days=n)
+ return date.strftime(DATE_FORMAT)
"""Collecting Day and date-related items."""
from __future__ import annotations
from datetime import datetime, timedelta
-from plomtask.exceptions import BadFormatException
from plomtask.db import DatabaseConnection, BaseModel
-
-DATE_FORMAT = '%Y-%m-%d'
-MIN_RANGE_DATE = '2024-01-01'
-MAX_RANGE_DATE = '2030-12-31'
-
-
-def valid_date(date_str: str) -> str:
- """Validate date against DATE_FORMAT or 'today', return in DATE_FORMAT."""
- if date_str == 'today':
- date_str = todays_date()
- try:
- dt = datetime.strptime(date_str, DATE_FORMAT)
- except (ValueError, TypeError) as e:
- msg = f'Given date of wrong format: {date_str}'
- raise BadFormatException(msg) from e
- return dt.strftime(DATE_FORMAT)
-
-
-def todays_date() -> str:
- """Return current date in DATE_FORMAT."""
- return datetime.now().strftime(DATE_FORMAT)
+from plomtask.todos import Todo
+from plomtask.dating import (DATE_FORMAT, valid_date)
class Day(BaseModel[str]):
super().__init__(id_)
self.datetime = datetime.strptime(self.date, DATE_FORMAT)
self.comment = comment
+ self.calendarized_todos: list[Todo] = []
def __lt__(self, other: Day) -> bool:
return self.date < other.date
@classmethod
- def all(cls, db_conn: DatabaseConnection,
- date_range: tuple[str, str] = ('', ''),
- fill_gaps: bool = False) -> list[Day]:
- """Return list of Days in database within (open) date_range interval.
-
- If no range values provided, defaults them to MIN_RANGE_DATE and
- MAX_RANGE_DATE. Also knows to properly interpret 'today' as value.
+ def by_date_range_filled(cls, db_conn: DatabaseConnection,
+ start: str, end: str) -> list[Day]:
+ """Return days existing and non-existing between dates start/end."""
+ ret = cls.by_date_range_with_limits(db_conn, (start, end), 'id')
+ days, start_date, end_date = ret
+ return cls.with_filled_gaps(days, start_date, end_date)
- On fill_gaps=True, will instantiate (without saving) Days of all dates
- within the date range that don't exist yet.
- """
- min_date = '2024-01-01'
- max_date = '2030-12-31'
- start_date = valid_date(date_range[0] if date_range[0] else min_date)
- end_date = valid_date(date_range[1] if date_range[1] else max_date)
- days = []
- sql = 'SELECT id FROM days WHERE id >= ? AND id <= ?'
- for row in db_conn.exec(sql, (start_date, end_date)):
- days += [cls.by_id(db_conn, row[0])]
+ @classmethod
+ def with_filled_gaps(cls, days: list[Day], start_date: str, end_date: str
+ ) -> list[Day]:
+ """In days, fill with (un-saved) Days gaps between start/end_date."""
+ if start_date > end_date:
+ return days
days.sort()
- if fill_gaps and len(days) > 1:
+ if start_date not in [d.date for d in days]:
+ days[:] = [Day(start_date)] + days
+ if end_date not in [d.date for d in days]:
+ days += [Day(end_date)]
+ if len(days) > 1:
gapless_days = []
for i, day in enumerate(days):
gapless_days += [day]
while day.next_date != days[i+1].date:
day = Day(day.next_date)
gapless_days += [day]
- days = gapless_days
+ days[:] = gapless_days
return days
@property
assert isinstance(self.id_, str)
return self.id_
+ @property
+ def first_of_month(self) -> bool:
+ """Return what month self.date is part of."""
+ assert isinstance(self.id_, str)
+ return self.id_[-2:] == '01'
+
+ @property
+ def month_name(self) -> str:
+ """Return what month self.date is part of."""
+ return self.datetime.strftime('%B')
+
@property
def weekday(self) -> str:
"""Return what weekday matches self.date."""
"""Return date succeeding date of this Day."""
next_datetime = self.datetime + timedelta(days=1)
return next_datetime.strftime(DATE_FORMAT)
+
+ def collect_calendarized_todos(self, db_conn: DatabaseConnection) -> None:
+ """Fill self.calendarized_todos."""
+ self.calendarized_todos = [t for t in Todo.by_date(db_conn, self.date)
+ if t.calendarize]
"""Database management."""
from __future__ import annotations
+from os import listdir
from os.path import isfile
from difflib import Differ
from sqlite3 import connect as sql_connect, Cursor, Row
from typing import Any, Self, TypeVar, Generic
from plomtask.exceptions import HandledException, NotFoundException
+from plomtask.dating import valid_date
-PATH_DB_SCHEMA = 'scripts/init.sql'
-EXPECTED_DB_VERSION = 0
+EXPECTED_DB_VERSION = 4
+MIGRATIONS_DIR = 'migrations'
+FILENAME_DB_SCHEMA = f'init_{EXPECTED_DB_VERSION}.sql'
+PATH_DB_SCHEMA = f'{MIGRATIONS_DIR}/{FILENAME_DB_SCHEMA}'
+
+
+class UnmigratedDbException(HandledException):
+ """To identify case of unmigrated DB file."""
class DatabaseFile: # pylint: disable=too-few-public-methods
self.path = path
self._check()
- def remake(self) -> None:
- """Create tables in self.path file as per PATH_DB_SCHEMA sql file."""
- with sql_connect(self.path) as conn:
+ @classmethod
+ def create_at(cls, path: str) -> DatabaseFile:
+ """Make new DB file at path."""
+ with sql_connect(path) as conn:
with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
conn.executescript(f.read())
- self._check()
+ conn.execute(f'PRAGMA user_version = {EXPECTED_DB_VERSION}')
+ return cls(path)
+
+ @classmethod
+ def migrate(cls, path: str) -> DatabaseFile:
+ """Apply migrations from_version to EXPECTED_DB_VERSION."""
+ migrations = cls._available_migrations()
+ from_version = cls.get_version_of_db(path)
+ migrations_todo = migrations[from_version+1:]
+ for j, filename in enumerate(migrations_todo):
+ with sql_connect(path) as conn:
+ with open(f'{MIGRATIONS_DIR}/{filename}', 'r',
+ encoding='utf-8') as f:
+ conn.executescript(f.read())
+ user_version = from_version + j + 1
+ with sql_connect(path) as conn:
+ conn.execute(f'PRAGMA user_version = {user_version}')
+ return cls(path)
def _check(self) -> None:
"""Check file exists, and is of proper DB version and schema."""
- self.exists = isfile(self.path)
- if self.exists:
- self._validate_user_version()
- self._validate_schema()
+ if not isfile(self.path):
+ raise NotFoundException
+ if self.user_version != EXPECTED_DB_VERSION:
+ raise UnmigratedDbException()
+ self._validate_schema()
+
+ @staticmethod
+ def _available_migrations() -> list[str]:
+ """Validate migrations directory and return sorted entries."""
+ msg_too_big = 'Migration directory points beyond expected DB version.'
+ msg_bad_entry = 'Migration directory contains unexpected entry: '
+ msg_missing = 'Migration directory misses migration of number: '
+ migrations = {}
+ for entry in listdir(MIGRATIONS_DIR):
+ if entry == FILENAME_DB_SCHEMA:
+ continue
+ toks = entry.split('_', 1)
+ if len(toks) < 2:
+ raise HandledException(msg_bad_entry + entry)
+ try:
+ i = int(toks[0])
+ except ValueError as e:
+ raise HandledException(msg_bad_entry + entry) from e
+ if i > EXPECTED_DB_VERSION:
+ raise HandledException(msg_too_big)
+ migrations[i] = toks[1]
+ migrations_list = []
+ for i in range(EXPECTED_DB_VERSION + 1):
+ if i not in migrations:
+ raise HandledException(msg_missing + str(i))
+ migrations_list += [f'{i}_{migrations[i]}']
+ return migrations_list
- def _validate_user_version(self) -> None:
- """Compare DB user_version with EXPECTED_DB_VERSION."""
+ @staticmethod
+ def get_version_of_db(path: str) -> int:
+ """Get DB user_version, fail if outside expected range."""
sql_for_db_version = 'PRAGMA user_version'
- with sql_connect(self.path) as conn:
+ with sql_connect(path) as conn:
db_version = list(conn.execute(sql_for_db_version))[0][0]
- if db_version != EXPECTED_DB_VERSION:
- msg = f'Wrong DB version, expected '\
- f'{EXPECTED_DB_VERSION}, got {db_version}.'
- raise HandledException(msg)
+ if db_version > EXPECTED_DB_VERSION:
+ msg = f'Wrong DB version, expected '\
+ f'{EXPECTED_DB_VERSION}, got unknown {db_version}.'
+ raise HandledException(msg)
+ assert isinstance(db_version, int)
+ return db_version
+
+ @property
+ def user_version(self) -> int:
+ """Get DB user_version."""
+ return self.__class__.get_version_of_db(self.path)
def _validate_schema(self) -> None:
"""Compare found schema with what's stored at PATH_DB_SCHEMA."""
+
+ def reformat_rows(rows: list[str]) -> list[str]:
+ new_rows = []
+ for row in rows:
+ new_row = []
+ for subrow in row.split('\n'):
+ subrow = subrow.rstrip()
+ in_parentheses = 0
+ split_at = []
+ for i, c in enumerate(subrow):
+ if '(' == c:
+ in_parentheses += 1
+ elif ')' == c:
+ in_parentheses -= 1
+ elif ',' == c and 0 == in_parentheses:
+ split_at += [i + 1]
+ prev_split = 0
+ for i in split_at:
+ segment = subrow[prev_split:i].strip()
+ if len(segment) > 0:
+ new_row += [f' {segment}']
+ prev_split = i
+ segment = subrow[prev_split:].strip()
+ if len(segment) > 0:
+ new_row += [f' {segment}']
+ new_row[0] = new_row[0].lstrip()
+ new_row[-1] = new_row[-1].lstrip()
+ if new_row[-1] != ')' and new_row[-3][-1] != ',':
+ new_row[-3] = new_row[-3] + ','
+ new_row[-2:] = [' ' + new_row[-1][:-1]] + [')']
+ new_rows += ['\n'.join(new_row)]
+ return new_rows
+
sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
msg_err = 'Database has wrong tables schema. Diff:\n'
with sql_connect(self.path) as conn:
schema_rows = [r[0] for r in conn.execute(sql_for_schema) if r[0]]
- retrieved_schema = ';\n'.join(schema_rows) + ';'
- with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
- stored_schema = f.read().rstrip()
- if stored_schema != retrieved_schema:
- diff_msg = Differ().compare(retrieved_schema.splitlines(),
- stored_schema.splitlines())
- raise HandledException(msg_err + '\n'.join(diff_msg))
+ schema_rows = reformat_rows(schema_rows)
+ retrieved_schema = ';\n'.join(schema_rows) + ';'
+ with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
+ stored_schema = f.read().rstrip()
+ if stored_schema != retrieved_schema:
+ diff_msg = Differ().compare(retrieved_schema.splitlines(),
+ stored_schema.splitlines())
+ raise HandledException(msg_err + '\n'.join(diff_msg))
class DatabaseConnection:
self.conn.close()
def rewrite_relations(self, table_name: str, key: str, target: int | str,
- rows: list[list[Any]]) -> None:
- """Rewrite relations in table_name to target, with rows values."""
+ rows: list[list[Any]], key_index: int = 0) -> None:
+ # pylint: disable=too-many-arguments
+ """Rewrite relations in table_name to target, with rows values.
+
+ Note that single rows are expected without the column and value
+ identified by key and target, which are inserted inside the function
+ at key_index.
+ """
self.delete_where(table_name, key, target)
for row in rows:
- values = tuple([target] + row)
+ values = tuple(row[:key_index] + [target] + row[key_index:])
q_marks = self.__class__.q_marks_from_values(values)
self.exec(f'INSERT INTO {table_name} VALUES {q_marks}', values)
return list(self.exec(f'SELECT * FROM {table_name} WHERE {key} = ?',
(target,)))
+ # def column_where_pattern(self,
+ # table_name: str,
+ # column: str,
+ # pattern: str,
+ # keys: list[str]) -> list[Any]:
+ # """Return column of rows where one of keys matches pattern."""
+ # targets = tuple([f'%{pattern}%'] * len(keys))
+ # haystack = ' OR '.join([f'{k} LIKE ?' for k in keys])
+ # sql = f'SELECT {column} FROM {table_name} WHERE {haystack}'
+ # return [row[0] for row in self.exec(sql, targets)]
+
def column_where(self, table_name: str, column: str, key: str,
target: int | str) -> list[Any]:
"""Return column of table where key == target."""
table_name = ''
to_save: list[str] = []
to_save_versioned: list[str] = []
- to_save_relations: list[tuple[str, str, str]] = []
+ to_save_relations: list[tuple[str, str, str, int]] = []
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
+ to_search: list[str] = []
def __init__(self, id_: BaseModelId | None) -> None:
if isinstance(id_, int) and id_ < 1:
items[item.id_] = item
return list(items.values())
+ @classmethod
+ def by_date_range_with_limits(cls: type[BaseModelInstance],
+ db_conn: DatabaseConnection,
+ date_range: tuple[str, str],
+ date_col: str = 'day'
+ ) -> tuple[list[BaseModelInstance], str,
+ str]:
+ """Return list of Days in database within (open) date_range interval.
+
+ If no range values provided, defaults them to 'yesterday' and
+ 'tomorrow'. Knows to properly interpret these and 'today' as value.
+ """
+ start_str = date_range[0] if date_range[0] else 'yesterday'
+ end_str = date_range[1] if date_range[1] else 'tomorrow'
+ start_date = valid_date(start_str)
+ end_date = valid_date(end_str)
+ items = []
+ sql = f'SELECT id FROM {cls.table_name} '
+ sql += f'WHERE {date_col} >= ? AND {date_col} <= ?'
+ for row in db_conn.exec(sql, (start_date, end_date)):
+ items += [cls.by_id(db_conn, row[0])]
+ return items, start_date, end_date
+
+ @classmethod
+ def matching(cls: type[BaseModelInstance], db_conn: DatabaseConnection,
+ pattern: str) -> list[BaseModelInstance]:
+ """Return all objects whose .to_search match pattern."""
+ items = cls.all(db_conn)
+ if pattern:
+ filtered = []
+ for item in items:
+ for attr_name in cls.to_search:
+ toks = attr_name.split('.')
+ parent = item
+ for tok in toks:
+ attr = getattr(parent, tok)
+ parent = attr
+ if pattern in attr:
+ filtered += [item]
+ break
+ return filtered
+ return items
+
def save(self, db_conn: DatabaseConnection) -> None:
"""Write self to DB and cache and ensure .id_.
self.cache()
for attr_name in self.to_save_versioned:
getattr(self, attr_name).save(db_conn)
- for table, column, attr_name in self.to_save_relations:
+ for table, column, attr_name, key_index in self.to_save_relations:
assert isinstance(self.id_, (int, str))
db_conn.rewrite_relations(table, column, self.id_,
[[i.id_] for i
- in getattr(self, attr_name)])
+ in getattr(self, attr_name)], key_index)
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB and cache, including dependencies."""
raise HandledException('cannot remove unsaved item')
for attr_name in self.to_save_versioned:
getattr(self, attr_name).remove(db_conn)
- for table, column, attr_name in self.to_save_relations:
+ for table, column, attr_name, _ in self.to_save_relations:
db_conn.delete_where(table, column, self.id_)
self.uncache()
db_conn.delete_where(self.table_name, 'id', self.id_)
"""Web server stuff."""
-from typing import Any, NamedTuple
+from typing import Any
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
from urllib.parse import urlparse, parse_qs
from os.path import split as path_split
from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
-from plomtask.days import Day, todays_date
+from plomtask.dating import date_in_n_days
+from plomtask.days import Day
from plomtask.exceptions import HandledException, BadFormatException, \
NotFoundException
from plomtask.db import DatabaseConnection, DatabaseFile
"""Handle any GET request."""
try:
self._init_handling()
- if self.site in {'calendar', 'day', 'process', 'processes', 'todo',
- 'condition', 'conditions'}:
+ if hasattr(self, f'do_GET_{self.site}'):
template = f'{self.site}.html'
ctx = getattr(self, f'do_GET_{self.site}')()
html = self.server.jinja.get_template(template).render(**ctx)
"""Show Days from ?start= to ?end=."""
start = self.params.get_str('start')
end = self.params.get_str('end')
- days = Day.all(self.conn, date_range=(start, end), fill_gaps=True)
- return {'start': start, 'end': end, 'days': days}
+ if not end:
+ end = date_in_n_days(60)
+ ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
+ days, start, end = ret
+ days = Day.with_filled_gaps(days, start, end)
+ for day in days:
+ day.collect_calendarized_todos(self.conn)
+ today = date_in_n_days(0)
+ return {'start': start, 'end': end, 'days': days, 'today': today}
def do_GET_day(self) -> dict[str, object]:
"""Show single Day of ?date=."""
-
- class ConditionListing(NamedTuple):
- """Listing of Condition augmented with its enablers, disablers."""
- condition: Condition
- enablers: list[Todo]
- disablers: list[Todo]
-
- date = self.params.get_str('date', todays_date())
- top_todos = [t for t in Todo.by_date(self.conn, date) if not t.parents]
- todo_trees = [t.get_undone_steps_tree() for t in top_todos]
- done_trees = []
- for t in top_todos:
- done_trees += t.get_done_steps_tree()
- condition_listings: list[ConditionListing] = []
- for cond in Condition.all(self.conn):
- enablers = Todo.enablers_for_at(self.conn, cond, date)
- disablers = Todo.disablers_for_at(self.conn, cond, date)
- condition_listings += [ConditionListing(cond, enablers, disablers)]
+ date = self.params.get_str('date', date_in_n_days(0))
+ todays_todos = Todo.by_date(self.conn, date)
+ conditions_present = []
+ enablers_for = {}
+ disablers_for = {}
+ for todo in todays_todos:
+ for condition in todo.conditions + todo.blockers:
+ if condition not in conditions_present:
+ conditions_present += [condition]
+ enablers_for[condition.id_] = [p for p in
+ Process.all(self.conn)
+ if condition in p.enables]
+ disablers_for[condition.id_] = [p for p in
+ Process.all(self.conn)
+ if condition in p.disables]
+ seen_todos: set[int] = set()
+ top_nodes = [t.get_step_tree(seen_todos)
+ for t in todays_todos if not t.parents]
return {'day': Day.by_id(self.conn, date, create=True),
- 'todo_trees': todo_trees,
- 'done_trees': done_trees,
- 'processes': Process.all(self.conn),
- 'condition_listings': condition_listings}
+ 'top_nodes': top_nodes,
+ 'enablers_for': enablers_for,
+ 'disablers_for': disablers_for,
+ 'conditions_present': conditions_present,
+ 'processes': Process.all(self.conn)}
def do_GET_todo(self) -> dict[str, object]:
"""Show single Todo of ?id=."""
'todo_candidates': Todo.by_date(self.conn, todo.date),
'condition_candidates': Condition.all(self.conn)}
+ def do_GET_todos(self) -> dict[str, object]:
+ """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
+ sort_by = self.params.get_str('sort_by')
+ start = self.params.get_str('start')
+ end = self.params.get_str('end')
+ process_id = self.params.get_int_or_none('process_id')
+ comment_pattern = self.params.get_str('comment_pattern')
+ todos = []
+ ret = Todo.by_date_range_with_limits(self.conn, (start, end))
+ todos_by_date_range, start, end = ret
+ todos = [t for t in todos_by_date_range
+ if comment_pattern in t.comment
+ and ((not process_id) or t.process.id_ == process_id)]
+ if sort_by == 'doneness':
+ todos.sort(key=lambda t: t.is_done)
+ elif sort_by == '-doneness':
+ todos.sort(key=lambda t: t.is_done, reverse=True)
+ elif sort_by == 'title':
+ todos.sort(key=lambda t: t.title_then)
+ elif sort_by == '-title':
+ todos.sort(key=lambda t: t.title_then, reverse=True)
+ elif sort_by == 'comment':
+ todos.sort(key=lambda t: t.comment)
+ elif sort_by == '-comment':
+ todos.sort(key=lambda t: t.comment, reverse=True)
+ elif sort_by == '-date':
+ todos.sort(key=lambda t: t.date, reverse=True)
+ else:
+ todos.sort(key=lambda t: t.date)
+ return {'start': start, 'end': end, 'process_id': process_id,
+ 'comment_pattern': comment_pattern, 'todos': todos,
+ 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
+
def do_GET_conditions(self) -> dict[str, object]:
"""Show all Conditions."""
- return {'conditions': Condition.all(self.conn)}
+ pattern = self.params.get_str('pattern')
+ conditions = Condition.matching(self.conn, pattern)
+ sort_by = self.params.get_str('sort_by')
+ if sort_by == 'is_active':
+ conditions.sort(key=lambda c: c.is_active)
+ elif sort_by == '-is_active':
+ conditions.sort(key=lambda c: c.is_active, reverse=True)
+ elif sort_by == '-title':
+ conditions.sort(key=lambda c: c.title.newest, reverse=True)
+ else:
+ conditions.sort(key=lambda c: c.title.newest)
+ return {'conditions': conditions,
+ 'sort_by': sort_by,
+ 'pattern': pattern}
def do_GET_condition(self) -> dict[str, object]:
"""Show Condition of ?id=."""
id_ = self.params.get_int_or_none('id')
- return {'condition': Condition.by_id(self.conn, id_, create=True)}
+ c = Condition.by_id(self.conn, id_, create=True)
+ ps = Process.all(self.conn)
+ return {'condition': c,
+ 'enabled_processes': [p for p in ps if c in p.conditions],
+ 'disabled_processes': [p for p in ps if c in p.blockers],
+ 'enabling_processes': [p for p in ps if c in p.enables],
+ 'disabling_processes': [p for p in ps if c in p.disables]}
+
+ def do_GET_condition_titles(self) -> dict[str, object]:
+ """Show title history of Condition of ?id=."""
+ id_ = self.params.get_int_or_none('id')
+ condition = Condition.by_id(self.conn, id_)
+ return {'condition': condition}
+
+ def do_GET_condition_descriptions(self) -> dict[str, object]:
+ """Show description historys of Condition of ?id=."""
+ id_ = self.params.get_int_or_none('id')
+ condition = Condition.by_id(self.conn, id_)
+ return {'condition': condition}
def do_GET_process(self) -> dict[str, object]:
- """Show process of ?id=."""
+ """Show Process of ?id=."""
id_ = self.params.get_int_or_none('id')
process = Process.by_id(self.conn, id_, create=True)
return {'process': process,
'steps': process.get_steps(self.conn),
'owners': process.used_as_step_by(self.conn),
+ 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
'step_candidates': Process.all(self.conn),
'condition_candidates': Condition.all(self.conn)}
+ def do_GET_process_titles(self) -> dict[str, object]:
+ """Show title history of Process of ?id=."""
+ id_ = self.params.get_int_or_none('id')
+ process = Process.by_id(self.conn, id_)
+ return {'process': process}
+
+ def do_GET_process_descriptions(self) -> dict[str, object]:
+ """Show description historys of Process of ?id=."""
+ id_ = self.params.get_int_or_none('id')
+ process = Process.by_id(self.conn, id_)
+ return {'process': process}
+
+ def do_GET_process_efforts(self) -> dict[str, object]:
+ """Show default effort history of Process of ?id=."""
+ id_ = self.params.get_int_or_none('id')
+ process = Process.by_id(self.conn, id_)
+ return {'process': process}
+
def do_GET_processes(self) -> dict[str, object]:
"""Show all Processes."""
- return {'processes': Process.all(self.conn)}
+ pattern = self.params.get_str('pattern')
+ processes = Process.matching(self.conn, pattern)
+ sort_by = self.params.get_str('sort_by')
+ if sort_by == 'steps':
+ processes.sort(key=lambda p: len(p.explicit_steps))
+ elif sort_by == '-steps':
+ processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
+ elif sort_by == 'effort':
+ processes.sort(key=lambda p: p.effort.newest)
+ elif sort_by == '-effort':
+ processes.sort(key=lambda p: p.effort.newest, reverse=True)
+ elif sort_by == '-title':
+ processes.sort(key=lambda p: p.title.newest, reverse=True)
+ else:
+ processes.sort(key=lambda p: p.title.newest)
+ return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
def do_POST(self) -> None:
"""Handle any POST request."""
postvars = parse_qs(self.rfile.read(length).decode(),
keep_blank_values=True, strict_parsing=True)
self.form_data = InputsParser(postvars)
- if self.site in ('day', 'process', 'todo', 'condition'):
+ if hasattr(self, f'do_POST_{self.site}'):
redir_target = getattr(self, f'do_POST_{self.site}')()
self.conn.commit()
else:
"""Update or insert Day of date and Todos mapped to it."""
date = self.params.get_str('date')
day = Day.by_id(self.conn, date, create=True)
- day.comment = self.form_data.get_str('comment')
+ day.comment = self.form_data.get_str('day_comment')
day.save(self.conn)
- existing_todos = Todo.by_date(self.conn, date)
- for process_id in self.form_data.get_all_int('new_todo'):
- process = Process.by_id(self.conn, process_id)
- todo = Todo(None, process, False, day.date)
- todo.save(self.conn)
- todo.adopt_from(existing_todos)
- todo.make_missing_children(self.conn)
- todo.save(self.conn)
- for todo_id in self.form_data.get_all_int('done'):
+ for process_id in sorted(self.form_data.get_all_int('new_todo')):
+ Todo.create_with_children(self.conn, process_id, date)
+ done_ids = self.form_data.get_all_int('done')
+ comments = self.form_data.get_all_str('comment')
+ efforts = self.form_data.get_all_str('effort')
+ for i, todo_id in enumerate(self.form_data.get_all_int('todo_id')):
todo = Todo.by_id(self.conn, todo_id)
- todo.is_done = True
+ todo.is_done = todo_id in done_ids
+ if len(comments) > 0:
+ todo.comment = comments[i]
+ if len(efforts) > 0:
+ todo.effort = float(efforts[i]) if efforts[i] else None
todo.save(self.conn)
for condition in todo.enables:
condition.save(self.conn)
continue
child = Todo.by_id(self.conn, child_id)
todo.add_child(child)
+ effort = self.form_data.get_str('effort', ignore_strict=True)
+ todo.effort = float(effort) if effort else None
todo.set_conditions(self.conn, self.form_data.get_all_int('condition'))
+ todo.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
todo.set_enables(self.conn, self.form_data.get_all_int('enables'))
todo.set_disables(self.conn, self.form_data.get_all_int('disables'))
todo.is_done = len(self.form_data.get_all_str('done')) > 0
+ todo.calendarize = len(self.form_data.get_all_str('calendarize')) > 0
+ todo.comment = self.form_data.get_str('comment', ignore_strict=True)
todo.save(self.conn)
for condition in todo.enables:
condition.save(self.conn)
process.effort.set(self.form_data.get_float('effort'))
process.set_conditions(self.conn,
self.form_data.get_all_int('condition'))
+ process.set_blockers(self.conn, self.form_data.get_all_int('blocker'))
process.set_enables(self.conn, self.form_data.get_all_int('enables'))
process.set_disables(self.conn, self.form_data.get_all_int('disables'))
+ process.calendarize = self.form_data.get_all_str('calendarize') != []
process.save(self.conn)
- process.explicit_steps = []
steps: list[tuple[int | None, int, int | None]] = []
+ for step_id in self.form_data.get_all_int('keep_step'):
+ if step_id not in self.form_data.get_all_int('steps'):
+ raise BadFormatException('trying to keep unknown step')
for step_id in self.form_data.get_all_int('steps'):
- for step_process_id in self.form_data.get_all_int(
- f'new_step_to_{step_id}'):
- steps += [(None, step_process_id, step_id)]
if step_id not in self.form_data.get_all_int('keep_step'):
continue
step_process_id = self.form_data.get_int(
parent_id = self.form_data.get_int_or_none(
f'step_{step_id}_parent_id')
steps += [(step_id, step_process_id, parent_id)]
+ for step_id in self.form_data.get_all_int('steps'):
+ for step_process_id in self.form_data.get_all_int(
+ f'new_step_to_{step_id}'):
+ steps += [(None, step_process_id, step_id)]
for step_process_id in self.form_data.get_all_int('new_top_step'):
steps += [(None, step_process_id, None)]
+ process.uncache()
process.set_steps(self.conn, steps)
process.save(self.conn)
return f'/process?id={process.id_}'
class Process(BaseModel[int], ConditionsRelations):
"""Template for, and metadata for, Todos, and their arrangements."""
+ # pylint: disable=too-many-instance-attributes
table_name = 'processes'
+ to_save = ['calendarize']
to_save_versioned = ['title', 'description', 'effort']
- to_save_relations = [('process_conditions', 'process', 'conditions'),
- ('process_enables', 'process', 'enables'),
- ('process_disables', 'process', 'disables')]
-
- # pylint: disable=too-many-instance-attributes
-
- def __init__(self, id_: int | None) -> None:
- super().__init__(id_)
+ to_save_relations = [('process_conditions', 'process', 'conditions', 0),
+ ('process_blockers', 'process', 'blockers', 0),
+ ('process_enables', 'process', 'enables', 0),
+ ('process_disables', 'process', 'disables', 0)]
+ to_search = ['title.newest', 'description.newest']
+
+ def __init__(self, id_: int | None, calendarize: bool = False) -> None:
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
self.description = VersionedAttribute(self, 'process_descriptions', '')
self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
self.explicit_steps: list[ProcessStep] = []
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
+ self.calendarize = calendarize
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
process.id_):
step = ProcessStep.from_table_row(db_conn, row_)
process.explicit_steps += [step] # pylint: disable=no-member
- for name in ('conditions', 'enables', 'disables'):
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
for c_id in db_conn.column_where(table, 'condition',
just deleted under its feet), or if the parent step would not be
owned by the current Process.
"""
-
def walk_steps(node: ProcessStep) -> None:
if node.step_process_id == self.id_:
raise BadFormatException('bad step selection causes recursion')
from typing import Any
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.processes import Process
+from plomtask.processes import Process, ProcessStepsNode
+from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
+from plomtask.dating import valid_date
@dataclass
-class TodoStepsNode:
+class TodoNode:
"""Collects what's useful to know for Todo/Condition tree display."""
- item: Todo | Condition
- is_todo: bool
- children: list[TodoStepsNode]
+ todo: Todo
seen: bool
- hide: bool
+ children: list[TodoNode]
class Todo(BaseModel[int], ConditionsRelations):
"""Individual actionable."""
# pylint: disable=too-many-instance-attributes
table_name = 'todos'
- to_save = ['process_id', 'is_done', 'date']
- to_save_relations = [('todo_conditions', 'todo', 'conditions'),
- ('todo_enables', 'todo', 'enables'),
- ('todo_disables', 'todo', 'disables'),
- ('todo_children', 'parent', 'children'),
- ('todo_children', 'child', 'parents')]
+ to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
+ 'calendarize']
+ to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
+ ('todo_blockers', 'todo', 'blockers', 0),
+ ('todo_enables', 'todo', 'enables', 0),
+ ('todo_disables', 'todo', 'disables', 0),
+ ('todo_children', 'parent', 'children', 0),
+ ('todo_children', 'child', 'parents', 1)]
+ to_search = ['comment']
- def __init__(self, id_: int | None, process: Process,
- is_done: bool, date: str) -> None:
- super().__init__(id_)
+ # pylint: disable=too-many-arguments
+ def __init__(self, id_: int | None,
+ process: Process,
+ is_done: bool,
+ date: str, comment: str = '',
+ effort: None | float = None,
+ calendarize: bool = False) -> None:
+ BaseModel.__init__(self, id_)
+ ConditionsRelations.__init__(self)
if process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
self.process = process
self._is_done = is_done
- self.date = date
+ self.date = valid_date(date)
+ self.comment = comment
+ self.effort = effort
self.children: list[Todo] = []
self.parents: list[Todo] = []
- self.conditions: list[Condition] = []
- self.enables: list[Condition] = []
- self.disables: list[Condition] = []
+ self.calendarize = calendarize
if not self.id_:
+ self.calendarize = self.process.calendarize
self.conditions = self.process.conditions[:]
+ self.blockers = self.process.blockers[:]
self.enables = self.process.enables[:]
self.disables = self.process.disables[:]
+ @classmethod
+ def by_date_range(cls, db_conn: DatabaseConnection,
+ date_range: tuple[str, str] = ('', '')) -> list[Todo]:
+ """Collect Todos of Days within date_range."""
+ todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
+ return todos
+
+ @classmethod
+ def create_with_children(cls, db_conn: DatabaseConnection,
+ process_id: int, date: str) -> Todo:
+ """Create Todo of process for date, ensure children."""
+
+ def key_order_func(n: ProcessStepsNode) -> int:
+ assert isinstance(n.process.id_, int)
+ return n.process.id_
+
+ def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
+ adoptables = [t for t in cls.by_date(db_conn, date)
+ if (t not in parent.children)
+ and (t != parent)
+ and step_node.process == t.process]
+ satisfier = None
+ for adoptable in adoptables:
+ satisfier = adoptable
+ break
+ if not satisfier:
+ satisfier = cls(None, step_node.process, False, date)
+ satisfier.save(db_conn)
+ sub_step_nodes = list(step_node.steps.values())
+ sub_step_nodes.sort(key=key_order_func)
+ for sub_node in sub_step_nodes:
+ n_slots = len([n for n in sub_step_nodes
+ if n.process == sub_node.process])
+ filled_slots = len([t for t in satisfier.children
+ if t.process == sub_node.process])
+ # if we did not newly create satisfier, it may already fill
+ # some step dependencies, so only fill what remains open
+ if n_slots - filled_slots > 0:
+ satisfier.add_child(walk_steps(satisfier, sub_node))
+ satisfier.save(db_conn)
+ return satisfier
+
+ process = Process.by_id(db_conn, process_id)
+ todo = cls(None, process, False, date)
+ todo.save(db_conn)
+ steps_tree = process.get_steps(db_conn)
+ for step_node in steps_tree.values():
+ todo.add_child(walk_steps(todo, step_node))
+ todo.save(db_conn)
+ return todo
+
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Todo:
'child', todo.id_):
# pylint: disable=no-member
todo.parents += [cls.by_id(db_conn, t_id)]
- for name in ('conditions', 'enables', 'disables'):
+ for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'todo_{name}'
assert isinstance(todo.id_, int)
for cond_id in db_conn.column_where(table, 'condition',
return todo
@classmethod
- def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
- """Collect all Todos for Day of date."""
- todos = []
- for id_ in db_conn.column_where('todos', 'id', 'day', date):
- todos += [cls.by_id(db_conn, id_)]
- return todos
-
- @staticmethod
- def _x_ablers_for_at(db_conn: DatabaseConnection, name: str,
- cond: Condition, date: str) -> list[Todo]:
- """Collect all Todos of day that [name] condition."""
- assert isinstance(cond.id_, int)
- x_ablers = []
- table = f'todo_{name}'
- for id_ in db_conn.column_where(table, 'todo', 'condition', cond.id_):
- todo = Todo.by_id(db_conn, id_)
- if todo.date == date:
- x_ablers += [todo]
- return x_ablers
-
- @classmethod
- def enablers_for_at(cls, db_conn: DatabaseConnection,
- condition: Condition, date: str) -> list[Todo]:
- """Collect all Todos of day that enable condition."""
- return cls._x_ablers_for_at(db_conn, 'enables', condition, date)
+ def by_process_id(cls, db_conn: DatabaseConnection,
+ process_id: int | None) -> list[Todo]:
+ """Collect all Todos of Process of process_id."""
+ return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
@classmethod
- def disablers_for_at(cls, db_conn: DatabaseConnection,
- condition: Condition, date: str) -> list[Todo]:
- """Collect all Todos of day that disable condition."""
- return cls._x_ablers_for_at(db_conn, 'disables', condition, date)
+ def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
+ """Collect all Todos for Day of date."""
+ return cls.by_date_range(db_conn, (date, date))
@property
def is_doable(self) -> bool:
for condition in self.conditions:
if not condition.is_active:
return False
+ for condition in self.blockers:
+ if condition.is_active:
+ return False
return True
@property
- def process_id(self) -> int | str | None:
- """Return ID of tasked Process."""
- return self.process.id_
+ def is_deletable(self) -> bool:
+ """Decide whether self be deletable (not if preserve-worthy values)."""
+ if self.comment:
+ return False
+ if self.effort and self.effort >= 0:
+ return False
+ return True
@property
- def unsatisfied_dependencies(self) -> list[int]:
- """Return Process IDs of .process.explicit_steps not in .children."""
- unsatisfied = [s.step_process_id for s in self.process.explicit_steps
- if s.parent_step_id is None]
- for child_process_id in [c.process.id_ for c in self.children]:
- if child_process_id in unsatisfied:
- unsatisfied.remove(child_process_id)
- return unsatisfied
+ def process_id(self) -> int | str | None:
+ """Needed for super().save to save Processes as attributes."""
+ return self.process.id_
@property
def is_done(self) -> bool:
for condition in self.disables:
condition.is_active = False
- def adopt_from(self, todos: list[Todo]) -> None:
- """As far as possible, fill unsatisfied dependencies from todos."""
- for process_id in self.unsatisfied_dependencies:
- for todo in [t for t in todos if t.process.id_ == process_id
- and t not in self.children]:
- self.add_child(todo)
- break
-
- def make_missing_children(self, db_conn: DatabaseConnection) -> None:
- """Fill unsatisfied dependencies with new Todos."""
- for process_id in self.unsatisfied_dependencies:
- process = Process.by_id(db_conn, process_id)
- todo = self.__class__(None, process, False, self.date)
- todo.save(db_conn)
- self.add_child(todo)
-
- def get_step_tree(self, seen_todos: set[int],
- seen_conditions: set[int]) -> TodoStepsNode:
- """Return tree of depended-on Todos and Conditions."""
-
- def make_node(step: Todo | Condition) -> TodoStepsNode:
- assert isinstance(step.id_, int)
- is_todo = isinstance(step, Todo)
- children = []
- if is_todo:
- assert isinstance(step, Todo)
- seen = step.id_ in seen_todos
- seen_todos.add(step.id_)
- potentially_enabled = set()
- for child in step.children:
- for condition in child.enables:
- potentially_enabled.add(condition.id_)
- children += [make_node(child)]
- for condition in [c for c in step.conditions
- if (not c.is_active)
- and (c.id_ not in potentially_enabled)]:
- children += [make_node(condition)]
- else:
- seen = step.id_ in seen_conditions
- seen_conditions.add(step.id_)
- return TodoStepsNode(step, is_todo, children, seen, False)
-
- node = make_node(self)
- return node
-
- def get_undone_steps_tree(self) -> TodoStepsNode:
- """Return tree of depended-on undone Todos and Conditions."""
+ @property
+ def title(self) -> VersionedAttribute:
+ """Shortcut to .process.title."""
+ return self.process.title
- def walk_tree(node: TodoStepsNode) -> None:
- if isinstance(node.item, Todo) and node.item.is_done:
- node.hide = True
- for child in node.children:
- walk_tree(child)
+ @property
+ def title_then(self) -> str:
+ """Shortcut to .process.title.at(self.date)"""
+ title_then = self.process.title.at(self.date)
+ assert isinstance(title_then, str)
+ return title_then
- seen_todos: set[int] = set()
- seen_conditions: set[int] = set()
- step_tree = self.get_step_tree(seen_todos, seen_conditions)
- walk_tree(step_tree)
- return step_tree
+ @property
+ def effort_then(self) -> float:
+ """Shortcut to .process.effort.at(self.date)"""
+ effort_then = self.process.effort.at(self.date)
+ assert isinstance(effort_then, float)
+ return effort_then
- def get_done_steps_tree(self) -> list[TodoStepsNode]:
- """Return tree of depended-on done Todos."""
+ def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
+ """Return tree of depended-on Todos."""
- def make_nodes(node: TodoStepsNode) -> list[TodoStepsNode]:
- children: list[TodoStepsNode] = []
- if not isinstance(node.item, Todo):
- return children
- for child in node.children:
- children += make_nodes(child)
- if node.item.is_done:
- node.children = children
- return [node]
- return children
+ def make_node(todo: Todo) -> TodoNode:
+ children = []
+ seen = todo.id_ in seen_todos
+ assert isinstance(todo.id_, int)
+ seen_todos.add(todo.id_)
+ for child in todo.children:
+ children += [make_node(child)]
+ return TodoNode(todo, seen, children)
- seen_todos: set[int] = set()
- seen_conditions: set[int] = set()
- step_tree = self.get_step_tree(seen_todos, seen_conditions)
- nodes = make_nodes(step_tree)
- return nodes
+ return make_node(self)
def add_child(self, child: Todo) -> None:
"""Add child to self.children, avoid recursion, update parenthoods."""
self.children.remove(child)
child.parents.remove(self)
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """On save calls, also check if auto-deletion by effort < 0."""
+ if self.effort and self.effort < 0 and self.is_deletable:
+ self.remove(db_conn)
+ return
+ super().save(db_conn)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, including relations."""
+ if not self.is_deletable:
+ raise HandledException('Cannot remove non-deletable Todo.')
children_to_remove = self.children[:]
parents_to_remove = self.parents[:]
for child in children_to_remove:
def at(self, queried_time: str) -> str | float:
"""Retrieve value of timestamp nearest queried_time from the past."""
+ if len(queried_time) == 10:
+ queried_time += ' 23:59:59.999'
sorted_timestamps = sorted(self.history.keys())
if 0 == len(sorted_timestamps):
return self.default
Jinja2==3.1.3
+unittest-parallel==1.6.1
"""Call this to start the application."""
from sys import exit as sys_exit
from os import environ
-from plomtask.exceptions import HandledException
+from plomtask.exceptions import HandledException, NotFoundException
from plomtask.http import TaskHandler, TaskServer
-from plomtask.db import DatabaseFile
+from plomtask.db import DatabaseFile, UnmigratedDbException
PLOMTASK_DB_PATH = environ.get('PLOMTASK_DB_PATH')
HTTP_PORT = 8082
DB_CREATION_ASK = 'Database file not found. Create? Y/n\n'
+DB_MIGRATE_ASK = 'Database file needs migration. Migrate? Y/n\n'
+
+
+def yes_or_fail(question: str, fail_msg: str) -> None:
+ """Ask question, raise HandledException(fail_msg) if reply not yes."""
+ reply = input(question)
+ if not reply.lower() in {'y', 'yes', 'yes.', 'yes!'}:
+ print('Not recognizing reply as "yes".')
+ raise HandledException(fail_msg)
if __name__ == '__main__':
try:
if not PLOMTASK_DB_PATH:
raise HandledException('PLOMTASK_DB_PATH not set.')
- db_file = DatabaseFile(PLOMTASK_DB_PATH)
- if not db_file.exists:
- legal_yesses = {'y', 'yes', 'yes.', 'yes!'}
- reply = input(DB_CREATION_ASK)
- if reply.lower() in legal_yesses:
- db_file.remake()
- else:
- print('Not recognizing reply as "yes".')
- raise HandledException('Cannot run without database.')
+ try:
+ db_file = DatabaseFile(PLOMTASK_DB_PATH)
+ except NotFoundException:
+ yes_or_fail(DB_CREATION_ASK, 'Cannot run without DB.')
+ db_file = DatabaseFile.create_at(PLOMTASK_DB_PATH)
+ except UnmigratedDbException:
+ yes_or_fail(DB_MIGRATE_ASK, 'Cannot run with unmigrated DB.')
+ db_file = DatabaseFile.migrate(PLOMTASK_DB_PATH)
server = TaskServer(db_file, ('localhost', HTTP_PORT), TaskHandler)
print(f'running at port {HTTP_PORT}')
try:
+++ /dev/null
-CREATE TABLE condition_descriptions (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- description TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES conditions(id)
-);
-CREATE TABLE condition_titles (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- title TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES conditions(id)
-);
-CREATE TABLE conditions (
- id INTEGER PRIMARY KEY,
- is_active BOOLEAN NOT NULL
-);
-CREATE TABLE days (
- id TEXT PRIMARY KEY,
- comment TEXT NOT NULL
-);
-CREATE TABLE process_conditions (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY (process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_descriptions (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- description TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE process_disables (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_efforts (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- effort REAL NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE process_enables (
- process INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(process, condition),
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE process_steps (
- id INTEGER PRIMARY KEY,
- owner INTEGER NOT NULL,
- step_process INTEGER NOT NULL,
- parent_step INTEGER,
- FOREIGN KEY (owner) REFERENCES processes(id),
- FOREIGN KEY (step_process) REFERENCES processes(id),
- FOREIGN KEY (parent_step) REFERENCES process_steps(step_id)
-);
-CREATE TABLE process_titles (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- title TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp),
- FOREIGN KEY (parent) REFERENCES processes(id)
-);
-CREATE TABLE processes (
- id INTEGER PRIMARY KEY
-);
-CREATE TABLE todo_children (
- parent INTEGER NOT NULL,
- child INTEGER NOT NULL,
- PRIMARY KEY (parent, child),
- FOREIGN KEY (parent) REFERENCES todos(id),
- FOREIGN KEY (child) REFERENCES todos(id)
-);
-CREATE TABLE todo_conditions (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todo_disables (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todo_enables (
- todo INTEGER NOT NULL,
- condition INTEGER NOT NULL,
- PRIMARY KEY(todo, condition),
- FOREIGN KEY (todo) REFERENCES todos(id),
- FOREIGN KEY (condition) REFERENCES conditions(id)
-);
-CREATE TABLE todos (
- id INTEGER PRIMARY KEY,
- process INTEGER NOT NULL,
- is_done BOOLEAN NOT NULL,
- day TEXT NOT NULL,
- FOREIGN KEY (process) REFERENCES processes(id),
- FOREIGN KEY (day) REFERENCES days(id)
-);
#!/bin/sh
set -e
-# for dir in $(echo '.' 'plomtask' 'tests'); do
-for dir in $(echo 'tests'); do
+for dir in $(echo '.' 'plomtask' 'tests'); do
echo "Running mypy on ${dir}/ …."
python3 -m mypy --strict ${dir}/*.py
echo "Running flake8 on ${dir}/ …"
echo "Running pylint on ${dir}/ …"
python3 -m pylint ${dir}/*.py
done
-echo "Running unittest on tests/."
-python3 -m unittest tests/*.py
+echo "Running unittest-parallel on tests/."
+unittest-parallel -t . -s tests/ -p '*.py'
+set +e
+rm test_db:*.*
+set -e
exit 0
--- /dev/null
+<!DOCTYPE html>
+<html>
+<meta charset="UTF-8">
+<style>
+body {
+ font-family: monospace;
+ text-align: left;
+ padding: 0;
+}
+input.btn-harmless {
+ color: green;
+}
+input.btn-dangerous {
+ color: red;
+}
+div.btn-to-right {
+ float: right;
+ text-align: right;
+}
+td, th, tr, table {
+ vertical-align: top;
+ margin-top: 1em;
+ padding: 0;
+ border-collapse: collapse;
+}
+th, td {
+ padding-right: 1em;
+}
+a {
+ color: black;
+}
+{% block css %}
+{% endblock %}
+</style>
+<body>
+<a href="day">today</a>
+<a href="calendar">calendar</a>
+<a href="conditions">conditions</a>
+<a href="processes">processes</a>
+<a href="todos">todos</a>
+<hr>
+{% block content %}
+{% endblock %}
+</body>
+</html>
--- /dev/null
+{% macro edit_buttons() %}
+<input class="btn-harmless" type="submit" name="update" value="update" />
+<div class="btn-to-right">
+<input class="btn-dangerous" type="submit" name="delete" value="delete" />
+</div>
+{% endmacro %}
+
+
+
+{% macro datalist_of_titles(title, candidates) %}
+<datalist id="{{title}}">
+{% for candidate in candidates %}
+<option value="{{candidate.id_}}">{{candidate.title.newest|e}}</option>
+{% endfor %}
+</datalist>
+{% endmacro %}
+
+
+
+{% macro simple_checkbox_table(title, items, type_name, list_name, add_string="add", historical=false) %}
+<table>
+{% for item in items %}
+<tr>
+<td>
+<input type="checkbox" name="{{title}}" value="{{item.id_}}" checked />
+</td>
+<td>
+<a href="{{type_name}}?id={{item.id_}}">{% if historical is true %}{{item.title_then}}{% else %}{{item.title.newest|e}}{% endif %}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+{{add_string}}: <input name="{{title}}" list="{{list_name}}" autocomplete="off" />
+{% endmacro %}
+
+
+
+{% macro history_page(item_name, item, attribute_name, attribute, as_pre=false) %}
+<h3>{{item_name}} {{attribute_name}} history</h3>
+<table>
+
+<tr>
+<th>{{item_name}}</th>
+<td><a href="{{item_name}}?id={{item.id_}}">{{item.title.newest|e}}</a></td>
+</tr>
+
+{% for date in attribute.history.keys() | sort(reverse=True) %}
+<tr>
+<th>{{date | truncate(19, True, '') }}</th>
+<td>{% if as_pre %}<pre>{% endif %}{{attribute.history[date]}}{% if as_pre %}</pre>{% endif %}</td>
+</tr>
+{% endfor %}
+
+</table>
+{% endmacro %}
+++ /dev/null
-<!DOCTYPE html>
-<html>
-<meta charset="UTF-8">
-<style>
-body {
- font-family: monospace;
-}
-input.btn-harmless {
- color: green;
-}
-input.btn-dangerous {
- color: red;
-}
-div.btn-to-right {
- float: right;
- text-align: right;
-}
-{% block css %}
-{% endblock %}
-</style>
-<body>
-<a href="processes">processes</a>
-<a href="conditions">conditions</a>
-<a href="day">today</a>
-<a href="calendar">calendar</a>
-<hr>
-{% block content %}
-{% endblock %}
-</body>
-</html>
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+
+
+
+{% block css %}
+tr.week_row td {
+ height: 0.3em;
+ background-color: black;
+ padding: 0;
+ margin: 0;
+ border-top: 0.2em solid white;
+}
+tr.month_row td {
+ border-top: 0.2em solid white;
+ color: white;
+ background-color: #555555;
+}
+table {
+ width: 100%;
+}
+tr.day_row td {
+ background-color: #cccccc;
+ border-top: 0.2em solid white;
+}
+td.day_name {
+ padding-right: 0.5em;
+}
+td.today {
+ font-weight: bold;
+}
+{% endblock %}
+
+
{% block content %}
+<h3>calendar</h3>
+
<form action="calendar" method="GET">
from <input name="start" value="{{start}}" />
to <input name="end" value="{{end}}" />
<input type="submit" value="OK" />
</form>
-<ul>
+<table>
{% for day in days %}
-<li><a href="day?date={{day.date}}">{{day.date}}</a> ({{day.weekday}}) {{day.comment|e}}
+
+{% if day.first_of_month %}
+<tr class="month_row">
+<td colspan=2>{{ day.month_name }}</td>
+</tr>
+{% endif %}
+
+{% if day.weekday == "Monday" %}
+<tr class="week_row">
+<td colspan=2></td>
+</tr>
+{% endif %}
+
+<tr class="day_row">
+<td class="day_name {% if day.date == today %}today{% endif %}"><a href="day?date={{day.date}}">{{day.weekday|truncate(2,True,'',0)}} {% if day.date == today %} {% endif %}{{day.date}}</a> {{day.comment|e}}</td>
+</tr>
+
+{% for todo in day.calendarized_todos %}
+<tr>
+<td>[{% if todo.is_done %}X{% else %} {% endif %}] <a href="todo?id={{todo.id_}}">{{todo.title_then|e}}</a>{% if todo.comment %} · {{todo.comment|e}}{% endif %}</td>
+</tr>
+{% endfor %}
+
{% endfor %}
-</ul>
+</table>
{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
{% block content %}
<h3>condition</h3>
<form action="condition?id={{condition.id_ or ''}}" method="POST">
-title: <input name="title" value="{{condition.title.newest|e}}" />
-description: <input name="description" value="{{condition.description.newest|e}}" />
-is active: <input name="is_active" type="checkbox" {% if condition.is_active %}checked{% endif %} />
+<table>
+
+<tr>
+<th>title</th>
+<td><input name="title" value="{{condition.title.newest|e}}" />{% if condition.id_ %} [<a href="condition_titles?id={{condition.id_}}">history</a>]{% endif %}</td>
+<tr/>
+
+<tr>
+<th>is active</th>
+<td><input name="is_active" type="checkbox" {% if condition.is_active %}checked{% endif %} /></td>
+<tr/>
+
+<tr>
+<th>description</th>
+<td><textarea name="description">{{condition.description.newest|e}}</textarea>{% if condition.id_ %} [<a href="condition_descriptions?id={{condition.id_}}">history</a>]{% endif %}</td>
+<tr/>
+
+<tr>
+<th>enables</th>
+<td>
+{% for process in enabled_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
+<tr>
+<th>disables</th>
+<td>
+{% for process in disabled_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+
+<tr>
+<th>enabled by</th>
+<td>
+{% for process in enabling_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
-<input class="btn-harmless" type="submit" name="update" value="update" />
-<div class="btn-to-right">
-<input class="btn-dangerous" type="submit" name="delete" value="delete" />
-</div>
+<tr>
+<th>disabled by</th>
+<td>
+{% for process in disabling_processes %}
+<a href="process?id={{process.id_}}">{{process.title.newest|e}}</a><br />
+{% endfor %}
+</td>
+</tr>
+</table>
+{{ macros.edit_buttons() }}
{% endblock %}
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+{{ macros.history_page("condition", condition, "description", condition.description, true) }}
+{% endblock %}
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+{{ macros.history_page("condition", condition, "title", condition.title) }}
+{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
{% block content %}
-<a href="condition">add</a>
-<ul>
+<h3>conditions</h3>
+
+<form action="conditions" method="GET">
+<input type="submit" value="filter" />
+<input name="pattern" value="{{pattern}}" />
+</form>
+
+<table>
+<tr>
+<th><a href="?sort_by={% if sort_by == "is_active" %}-{% endif %}is_active">active</a></th>
+<th><a href="?sort_by={% if sort_by == "title" %}-{% endif %}title">title</a></th>
+</tr>
{% for condition in conditions %}
-<li><a href="condition?id={{condition.id_}}">{{condition.title.newest}}</a>
+<tr>
+<td>[{% if condition.is_active %}X{% else %} {% endif %}]</td>
+<td><a href="condition?id={{condition.id_}}">{{condition.title.newest}}</a></td>
+</tr>
{% endfor %}
-</ul>
-{% endblock %}
+</table>
+<p>
+<a href="condition">add</a>
+</p>
+{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
-{% macro show_node(node, indent) %}
-{% if node.is_todo %}
-{% for i in range(indent) %} {% endfor %} +
-{% if node.seen %}({% else %}{% endif %}<a href="todo?id={{node.item.id_}}">{{node.item.process.title.newest|e}}</a>{% if node.seen %}){% else %}{% endif %}
-{% else %}
-{% for i in range(indent) %} {% endfor %} +
-{% if node.seen %}({% else %}{% endif %}<a href="condition?id={{node.item.id_}}">{{node.item.title.newest|e}}</a>{% if node.seen %}){% else %}{% endif %}
+
+{% block css %}
+td, th, tr, table {
+ padding: 0;
+ margin: 0;
+}
+table {
+ border-collapse: collapse;
+}
+th {
+ border: 1px solid black;
+}
+td.cond_line_0 {
+ border-top: 1px solid white;
+ background-color: #dddddd;
+}
+td.cond_line_1 {
+ border-top: 1px solid white;
+ background-color: #efefef;
+}
+td.cond_line_2 {
+ border-top: 1px solid white;
+ background-color: #fffff;
+}
+td.cond_line_corner {
+ max-width: 0px;
+ white-space: nowrap;
+ overflow: hidden;
+ text-overflow: clip;
+}
+td.todo_line {
+ border-bottom: 1px solid #dddddd;
+ height: 1.7em;
+}
+tr.inactive td.todo_line {
+ background-color: #dddddd;
+}
+td.left_border {
+ border-left: 1px solid black;
+}
+td.right_border {
+ border-right: 1px solid black;
+}
+input[type="text"] {
+ width: 98%;
+}
+input[name="day_comment"] {
+ width: 100em;
+}
+{% endblock %}
+
+
+
+{% macro show_node_undone(node, indent) %}
+{% if not node.todo.is_done %}
+<tr {% if node.seen or not node.todo.is_doable %}class="inactive"{% endif %}>
+{% if not node.seen %}
+<input type="hidden" name="todo_id" value="{{node.todo.id_}}" />
{% endif %}
-{% endmacro %}
+{% for condition in conditions_present %}
+<td class="cond_line_{{loop.index0 % 3}}">
+{% if condition in node.todo.conditions and not condition.is_active %}
+O
+{% elif condition in node.todo.blockers and condition.is_active %}
+!
+{% endif %}
+</td>
+{% endfor %}
-{% macro undone_with_children(node, indent) %}
-{% if not node.hide %}
-<tr>
-<td>
-{% if node.is_todo %}
-<input name="done" value="{{node.item.id_}}" type="checkbox" {% if node.seen or not node.item.is_doable %}disabled{% endif %} {% if node.item.is_done %} checked {% endif %} />
+{% if node.seen %}
+<td class="todo_line left_border"></td>
+<td class="todo_line">{% if node.todo.effort %}{{ node.todo.effort }}{% endif %}</td>
+{% else %}
+<td class="todo_line left_border"><input name="done" type="checkbox" value="{{node.todo.id_}}" {% if not node.todo.is_doable %}disabled{% endif %}/></td>
+<td class="todo_line"><input name="effort" type="number" step=0.1 size=5 placeholder={{node.todo.effort_then}} value={{node.todo.effort}} /></td>
{% endif %}
+<td class="todo_line right_border">
+{% for i in range(indent) %} {% endfor %} +
+{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.title_then|e}}</a>{% if node.seen %}){% endif %}
</td>
-<td>
-{{ show_node(node, indent) }}
+
+{% for condition in conditions_present|reverse %}
+<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">{% if condition in node.todo.enables %} +{% elif condition in node.todo.disables %} !{% endif %}</td>
+{% endfor %}
+
+<td colspan=2>
+{% if node.seen %}
+{{node.todo.comment|e}}
+{% else %}
+<input name="comment" type="text" value="{{node.todo.comment|e}}" />
+{% endif %}
</td>
+
</tr>
{% endif %}
+
+{% if not node.seen %}
{% for child in node.children %}
-{{ undone_with_children(child, indent+1) }}
+{{ show_node_undone(child, indent+1) }}
{% endfor %}
+{% endif %}
+
{% endmacro %}
-{% macro done_with_children(node, indent) %}
-{% if not node.hide %}
+
+{% macro show_node_done(node, indent, path) %}
+{% if node.todo.is_done %}
+
+<tr>
+{% if path|length > 0 and not path[-1].todo.is_done %}
+<td>
+({% for path_node in path %}<a href="todo?id={{path_node.todo.id_}}">{{path_node.todo.title_then|e}}</a> <- {% endfor %})
+</td>
+</tr>
+
<tr>
<td>
-{{ show_node(node, indent) }}
+ +
+{% else %}
+<td>
+{% for i in range(indent) %} {% endfor %} +
+{% endif %}
+{% if node.seen %}({% endif %}<a href="todo?id={{node.todo.id_}}">{{node.todo.title_then|e}}</a> {% if node.todo.comment|length > 0 %}[{{node.todo.comment|e}}]{% endif %}{% if node.seen %}){% endif %}
</td>
</tr>
+
{% endif %}
+{% if not node.seen %}
{% for child in node.children %}
-{{ done_with_children(child, indent+1) }}
+{{ show_node_done(child, indent+1, path + [node]) }}
{% endfor %}
+{% endif %}
+
{% endmacro %}
+
{% block content %}
<h3>{{day.date}} / {{day.weekday}}</h3>
<p>
<a href="day?date={{day.prev_date}}">prev</a> | <a href="day?date={{day.next_date}}">next</a>
</p>
<form action="day?date={{day.date}}" method="POST">
-comment: <input name="comment" value="{{day.comment|e}}" />
-<input type="submit" value="OK" /><br />
-add todo: <input name="new_todo" list="processes" autocomplete="off" />
-<datalist id="processes">
-{% for process in processes %}
-<option value="{{process.id_}}">{{process.title.newest|e}}</option>
+
+<p>
+comment:
+<input name="day_comment" value="{{day.comment|e}}" />
+<input type="submit" value="OK" /></td>
+</p>
+
+<h4>to do</h4>
+
+<table>
+
+<tr>
+<th colspan={{ conditions_present|length + 3 + conditions_present|length }}>conditions</th>
+<th>add enabler</th>
+<th>add disabler</th>
+</tr>
+
+{% for condition in conditions_present %}
+{% set outer_loop = loop %}
+<tr>
+
+{% for _ in conditions_present %}
+{% if outer_loop.index > loop.index %}
+<td class="cond_line_{{loop.index0 % 3}}">
+{% elif outer_loop.index < loop.index %}
+<td class="cond_line_{{outer_loop.index0 % 3}}">
+{% else %}
+<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner">×
+{% endif %}
+</td>
+{% endfor %}
+
+<td class="cond_line_{{loop.index0 % 3}}"><input type="checkbox" disabled{% if condition.is_active %} checked{% endif %}></td>
+<td colspan=2 class="cond_line_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
+
+{% for _ in conditions_present %}
+{% if outer_loop.index0 + loop.index < conditions_present|length %}
+<td class="cond_line_{{outer_loop.index0 % 3}}">
+{% elif outer_loop.index0 + loop.index > conditions_present|length %}
+<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">
+{% else %}
+<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner"> ×
+{% endif %}
{% endfor %}
-</datalist>
-<h4>conditions</h4>
-<ul>
-{% for node in condition_listings %}
-<li>[{% if node.condition.is_active %}x{% else %} {% endif %}] <a href="condition?id={{node.condition.id_}}">{{node.condition.title.newest|e}}</a>
-({% for enabler in node.enablers %}
-< {{enabler.process.title.newest|e}};
+
+{% set list_name = "todos_for_%s"|format(condition.id_) %}
+<td><input name="new_todo" list="{{list_name}}" autocomplete="off" /></td>
+{{ macros.datalist_of_titles(list_name, enablers_for[condition.id_]) }}
+</td>
+{% set list_name = "todos_against_%s"|format(condition.id_) %}
+<td><input name="new_todo" list="{{list_name}}" autocomplete="off" /></td>
+{{ macros.datalist_of_titles(list_name, disablers_for[condition.id_]) }}
+</td>
+</tr>
+{% endfor %}
+
+<tr>
+{% for condition in conditions_present %}
+<td class="cond_line_{{loop.index0 % 3}}"></td>
{% endfor %}
-{% for disabler in node.disablers %}
-! {{disabler.process.title.newest|e}};
-{% endfor %})
+<th colspan=3>doables</th>
+{% for condition in conditions_present %}
+<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}"></td>
{% endfor %}
-</ul>
-<h4>to do</h4>
-<table>
-{% for node in todo_trees %}
-{{ undone_with_children(node, indent=0) }}
+<td colspan=2></td>
+</tr>
+<tr>
+{% for condition in conditions_present %}
+<td class="cond_line_{{loop.index0 % 3}}"></td>
+{% endfor %}
+<td class="left_border"></td>
+<td>add:</td>
+<td class="right_border" ><input type="text" name="new_todo" list="processes"></td>
+{% for condition in conditions_present %}
+<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}"></td>
{% endfor %}
+<th colspan=2>comments</th>
+</tr>
+{% for node in top_nodes %}
+{{ show_node_undone(node, 0) }}
+{% endfor %}
+
</table>
+
<h4>done</h4>
+
<table>
-{% for node in done_trees %}
-{{ done_with_children(node, indent=0) }}
+{% for node in top_nodes %}
+{{ show_node_done(node, 0, []) }}
{% endfor %}
</table>
+
</form>
-{% endblock %}
+{{ macros.datalist_of_titles("processes", processes) }}
+{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+
+
{% block content %}
<p>{{msg}}</p>
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
</td>
<td>
{% if step_node.is_explicit %}
-add step: <input name="new_step_to_{{step_id}}" list="candidates" autocomplete="off" />
+add sub-step: <input name="new_step_to_{{step_id}}" list="step_candidates" autocomplete="off" />
{% endif %}
</td>
</tr>
<h3>process</h3>
<form action="process?id={{process.id_ or ''}}" method="POST">
<table>
+
<tr>
<th>title</th>
-<td><input name="title" value="{{process.title.newest|e}}" /></td>
+<td><input name="title" value="{{process.title.newest|e}}" />{% if process.id_ %} [<a href="process_titles?id={{process.id_}}">history</a>]{% endif %}</td>
</tr>
+
<tr>
<th>default effort</th>
-<td><input name="effort" type="number" step=0.1 value={{process.effort.newest}} /></td>
+<td><input name="effort" type="number" step=0.1 value={{process.effort.newest}} />{% if process.id_ %} [<a href="process_efforts?id={{process.id_}}">history</a>]{% endif %}</td>
</tr>
+
<tr>
<th>description</th>
-<td><textarea name="description">{{process.description.newest|e}}</textarea></td>
+<td><textarea name="description">{{process.description.newest|e}}</textarea><br />{% if process.id_ %} [<a href="process_descriptions?id={{process.id_}}">history</a>]{% endif %}</td>
</tr>
+
<tr>
-<th>conditions</th>
-<td>
-<table>
-{% for condition in process.conditions %}
+<th>calendarize</th>
+<td><input type="checkbox" name="calendarize" {% if process.calendarize %}checked {% endif %}</td>
+</tr>
+
<tr>
-<td>
-<input type="checkbox" name="condition" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
-</td>
+<th>conditions</th>
+<td>{{ macros.simple_checkbox_table("condition", process.conditions, "condition", "condition_candidates") }}</td>
</tr>
-{% endfor %}
-</table>
-add condition: <input name="condition" list="condition_candidates" autocomplete="off" />
-</td>
+
+<tr>
+<th>blockers</th>
+<td>{{ macros.simple_checkbox_table("blocker", process.blockers, "condition", "condition_candidates") }}</td>
</tr>
+
<tr>
<th>enables</th>
-<td>
-<table>
-{% for condition in process.enables %}
-<tr>
-<td>
-<input type="checkbox" name="enables" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
-</td>
-</tr>
-{% endfor %}
-</table>
-add enables: <input name="enables" list="condition_candidates" autocomplete="off" />
-</td>
+<td>{{ macros.simple_checkbox_table("enables", process.enables, "condition", "condition_candidates") }}</td>
</tr>
+
<tr>
<th>disables</th>
-<td>
-<table>
-{% for condition in process.disables %}
-<tr>
-<td>
-<input type="checkbox" name="disables" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
-</td>
-</tr>
-{% endfor %}
-</table>
-add disables: <input name="disables" list="condition_candidates" autocomplete="off" />
-</td>
+<td>{{ macros.simple_checkbox_table("disables", process.disables, "condition", "condition_candidates") }}</td>
</tr>
+
<tr>
<th>steps</th>
<td>
{{ step_with_steps(step_id, step_node, 0) }}
{% endfor %}
</table>
-add step: <input name="new_top_step" list="step_candidates" autocomplete="off" />
+add: <input name="new_top_step" list="step_candidates" autocomplete="off" />
</td>
<tr>
-</table>
-<datalist id="condition_candidates">
-{% for condition_candidate in condition_candidates %}
-<option value="{{condition_candidate.id_}}">{{condition_candidate.title.newest|e}}</option>
-{% endfor %}
-</datalist>
-<datalist id="step_candidates">
-{% for candidate in step_candidates %}
-<option value="{{candidate.id_}}">{{candidate.title.newest|e}}</option>
-{% endfor %}
-</datalist>
-<input class="btn-harmless" type="submit" name="update" value="update" />
-<div class="btn-to-right">
-<input class="btn-dangerous" type="submit" name="delete" value="delete" />
-</div>
-</form>
-<h4>step of</h4>
-<ul>
+
+<tr>
+<th>step of</th>
+<td>
{% for owner in owners %}
-<li><a href="process?id={{owner.id_}}">{{owner.title.newest|e}}</a>
+<a href="process?id={{owner.id_}}">{{owner.title.newest|e}}</a><br />
{% endfor %}
-</ul>
+</td>
+<tr>
+
+<tr>
+<th>todos</th>
+<td>
+<a href="todos?process_id={{process.id_}}">{{n_todos}}</a><br />
+</td>
+<tr>
+
+</table>
+{{ macros.edit_buttons() }}
+</form>
+
+{{ macros.datalist_of_titles("condition_candidates", condition_candidates) }}
+{{ macros.datalist_of_titles("step_candidates", step_candidates) }}
{% endblock %}
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+{{ macros.history_page("process", process, "description", process.description, as_pre=true) }}
+{% endblock %}
+
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+{{ macros.history_page("process", process, "effort", process.effort) }}
+{% endblock %}
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+{{ macros.history_page("process", process, "title", process.title) }}
+{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
{% block content %}
-<a href="process">add</a>
-<ul>
+<h3>processes</h3>
+
+<form action="processes" method="GET">
+<input type="submit" value="filter" />
+<input name="pattern" value="{{pattern}}" />
+</form>
+
+<table>
+<tr>
+<th><a href="?sort_by={% if sort_by == "steps" %}-{% endif %}steps">steps</a></th>
+<th><a href="?sort_by={% if sort_by == "effort" %}-{% endif %}effort">effort</a></th>
+<th><a href="?sort_by={% if sort_by == "title" %}-{% endif %}title">title</a></th>
+</tr>
{% for process in processes %}
-<li><a href="process?id={{process.id_}}">{{process.title.newest}}</a>
+<tr>
+<td>{{ process.explicit_steps|count }}</td>
+<td>{{ process.effort.newest }}</td>
+<td><a href="process?id={{process.id_}}">{{process.title.newest}}</a></td>
+</tr>
{% endfor %}
-</ul>
-{% endblock %}
+</table>
+<p>
+<a href="process">add</a>
+</p>
+{% endblock %}
-{% extends 'base.html' %}
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
{% block content %}
-<h3>Todo: {{todo.process.title.newest|e}}</h3>
+<h3>Todo: {{todo.title_then|e}}</h3>
<form action="todo?id={{todo.id_}}" method="POST">
-<p>
-id: {{todo.id_}}<br />
-day: <a href="day?date={{todo.date}}">{{todo.date}}</a><br />
-process: <a href="process?id={{todo.process.id_}}">{{todo.process.title.newest|e}}</a><br />
-done: <input type="checkbox" name="done" {% if todo.is_done %}checked {% endif %} {% if not todo.is_doable %}disabled {% endif %}/><br />
-</p>
-<h4>conditions</h4>
<table>
-{% for condition in todo.conditions %}
+
<tr>
-<td>
-<input type="checkbox" name="condition" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
-</td>
+<th>day</th>
+<td><a href="day?date={{todo.date}}">{{todo.date}}</a></td>
</tr>
-{% endfor %}
-</table>
-add condition: <input name="condition" list="condition_candidates" autocomplete="off" />
-<datalist id="condition_candidates">
-{% for condition_candidate in condition_candidates %}
-<option value="{{condition_candidate.id_}}">{{condition_candidate.title.newest|e}}</option>
-{% endfor %}
-</datalist>
-<h4>enables</h4>
-<table>
-{% for condition in todo.enables %}
+
<tr>
-<td>
-<input type="checkbox" name="enables" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
-</td>
+<th>process</th>
+<td><a href="process?id={{todo.process.id_}}">{{todo.process.title.newest|e}}</a></td>
</tr>
-{% endfor %}
-</table>
-add enables: <input name="enables" list="condition_candidates" autocomplete="off" />
-<h4>disables</h4>
-<table>
-{% for condition in todo.disables%}
+
<tr>
+<th>done</th>
+<td><input type="checkbox" name="done" {% if todo.is_done %}checked {% endif %} {% if not todo.is_doable %}disabled {% endif %}/><br /></td>
+</tr>
+
+<tr>
+<th>effort</th>
+<td><input type="number" name="effort" step=0.1 size=5 placeholder={{todo.effort_then}} value={{todo.effort}} /><br /></td>
+</tr>
+
+<tr>
+<th>comment</th>
+<td><input name="comment" value="{{todo.comment|e}}"/></td>
+</tr>
+
+<tr>
+<th>calendarize</th>
+<td><input type="checkbox" name="calendarize" {% if todo.calendarize %}checked {% endif %}</td>
+</tr>
+
+<tr>
+<th>conditions</th>
+<td>{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates") }}</td>
+</tr>
+
+<tr>
+<th>blockers</th>
+<td>{{ macros.simple_checkbox_table("blocker", todo.blockers, "condition", "condition_candidates") }}</td>
+</tr>
+
+<tr>
+<th>enables</th>
+<td>{{ macros.simple_checkbox_table("enables", todo.enables, "condition", "condition_candidates") }}</td>
+</tr>
+
+<tr>
+<th>disables</th>
+<td>{{ macros.simple_checkbox_table("disables", todo.disables, "condition", "condition_candidates") }}</td>
+</tr>
+
+<tr>
+<th>parents</th>
<td>
-<input type="checkbox" name="disables" value="{{condition.id_}}" checked />
-</td>
-<td>
-<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+{% for parent in todo.parents %}
+<a href="todo?id={{parent.id_}}">{{parent.title_then|e}}</a><br />
+{% endfor %}
</td>
</tr>
-{% endfor %}
+
+<tr>
+<th>children</th>
+<td>{{ macros.simple_checkbox_table("adopt", todo.children, "todo", "todo_candidates", "adopt", true) }}</td>
+</tr>
+
</table>
-add disables: <input name="disables" list="condition_candidates" autocomplete="off" />
-<h4>parents</h4>
-<ul>
-{% for parent in todo.parents %}
-<li><a href="todo?id={{parent.id_}}">{{parent.process.title.newest|e}}</a>
-{% endfor %}
-</ul>
-<h4>children</h4>
-<ul>
-{% for child in todo.children %}
-<li><input type="checkbox" name="adopt" value="{{child.id_}}" checked />
-<a href="todo?id={{child.id_}}">{{child.process.title.newest|e}}</a>
-{% endfor %}
-</ul>
-adopt: <input name="adopt" list="todo_candidates" autocomplete="off" />
+{{ macros.edit_buttons() }}
+</form>
+
+{{ macros.datalist_of_titles("condition_candidates", condition_candidates) }}
<datalist id="todo_candidates">
{% for candidate in todo_candidates %}
-<option value="{{candidate.id_}}">{{candidate.process.title.newest|e}} ({{candidate.id_}})</option>
+<option value="{{candidate.id_}}">{{candidate.title.newest|e}} {{candidate.comment|e}}</option>
{% endfor %}
</datalist>
-
-<input class="btn-harmless" type="submit" name="update" value="update" />
-<div class="btn-to-right">
-<input class="btn-dangerous" type="submit" name="delete" value="delete" />
-</div>
-
-</form
{% endblock %}
--- /dev/null
+{% extends '_base.html' %}
+{% import '_macros.html' as macros %}
+
+
+
+{% block content %}
+<h3>todos</h3>
+
+<form action="todos" method="GET">
+<input type="submit" value="filter" />
+process <input name="process_id" value="{{process_id or ''}}" list="processes" />
+from <input name="start" value="{{start}}" />
+to <input name="end" value="{{end}}" />
+in comment <input name="comment_pattern" value="{{comment_pattern}}" />
+<input type="submit" value="OK" />
+</form>
+
+<table>
+<tr>
+<th><a href="?sort_by={% if sort_by == "doneness" %}-{% endif %}doneness">done</a></th>
+<th><a href="?sort_by={% if sort_by == "date" %}-{% endif %}date">date</a></th>
+<th><a href="?sort_by={% if sort_by == "title" %}-{% endif %}title">title</a></th>
+<th><a href="?sort_by={% if sort_by == "comment" %}-{% endif %}comment">comment</a></th>
+</tr>
+{% for todo in todos %}
+<tr>
+<td>[{% if todo.is_done %}x{% else %} {% endif %}]</td>
+<td><a href="day?date={{todo.date}}">{{todo.date}}</a></td>
+<td><a href="todo?id={{todo.id_}}">{{todo.title_then}}</a></td>
+<td>{{todo.comment}}</td>
+</tr>
+{% endfor %}
+</table>
+{{ macros.datalist_of_titles("processes", all_processes) }}
+{% endblock %}
+
class TestsSansDB(TestCaseSansDB):
"""Tests requiring no DB setup."""
checked_class = Condition
-
- def test_Condition_id_setting(self) -> None:
- """Test .id_ being set and its legal range being enforced."""
- self.check_id_setting()
-
- def test_Condition_versioned_defaults(self) -> None:
- """Test defaults of VersionedAttributes."""
- self.check_versioned_defaults({
- 'title': 'UNNAMED',
- 'description': ''})
+ do_id_test = True
+ versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''}
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Condition
-
- def test_Condition_saving_and_caching(self) -> None:
- """Test .save/.save_core."""
- kwargs = {'id_': 1, 'is_active': False}
- self.check_saving_and_caching(**kwargs)
- # check .id_ set if None, and versioned attributes too
- c = Condition(None)
- c.save(self.db_conn)
- self.assertEqual(c.id_, 2)
- self.check_saving_of_versioned('title', str)
- self.check_saving_of_versioned('description', str)
+ default_init_kwargs = {'is_active': False}
+ test_versioneds = {'title': str, 'description': str}
def test_Condition_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class from DB"""
from unittest import TestCase
from datetime import datetime
from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.days import Day, todays_date
+from plomtask.dating import date_in_n_days
+from plomtask.days import Day
from plomtask.exceptions import BadFormatException
checked_class = Day
default_ids = ('2024-01-01', '2024-01-02', '2024-01-03')
- def test_Day_saving_and_caching(self) -> None:
- """Test .save/.save_core."""
+ def test_saving_and_caching(self) -> None:
+ """Test storage of instances.
+
+ We don't use the parent class's method here because the checked class
+ has too different a handling of IDs.
+ """
kwargs = {'date': self.default_ids[0], 'comment': 'foo'}
self.check_saving_and_caching(**kwargs)
"""Test .by_id()."""
self.check_by_id()
- def test_Day_all(self) -> None:
- """Test Day.all(), especially in regards to date range filtering."""
+ def test_Day_by_date_range_filled(self) -> None:
+ """Test Day.by_date_range_filled."""
date1, date2, date3 = self.default_ids
day1, day2, day3 = self.check_all()
- self.assertEqual(Day.all(self.db_conn, ('', '')),
- [day1, day2, day3])
# check date range is a closed interval
- self.assertEqual(Day.all(self.db_conn, (date1, date3)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
[day1, day2, day3])
# check first date range value excludes what's earlier
- self.assertEqual(Day.all(self.db_conn, (date2, date3)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date2, date3),
[day2, day3])
- self.assertEqual(Day.all(self.db_conn, (date3, '')),
- [day3])
# check second date range value excludes what's later
- self.assertEqual(Day.all(self.db_conn, ('', date2)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date2),
[day1, day2])
# check swapped (impossible) date range returns emptiness
- self.assertEqual(Day.all(self.db_conn, (date3, date1)),
+ self.assertEqual(Day.by_date_range_filled(self.db_conn, date3, date1),
[])
# check fill_gaps= instantiates unsaved dates within date range
# (but does not store them)
- day4 = Day('2024-01-04')
day5 = Day('2024-01-05')
day6 = Day('2024-01-06')
day6.save(self.db_conn)
- self.assertEqual(Day.all(self.db_conn, (date2, '2024-01-07'),
- fill_gaps=True),
- [day2, day3, day4, day5, day6])
+ day7 = Day('2024-01-07')
+ self.assertEqual(Day.by_date_range_filled(self.db_conn,
+ day5.date, day7.date),
+ [day5, day6, day7])
self.check_storage([day1, day2, day3, day6])
# check 'today' is interpreted as today's date
- today = Day(todays_date())
+ today = Day(date_in_n_days(0))
today.save(self.db_conn)
- self.assertEqual(Day.all(self.db_conn, ('today', 'today')), [today])
+ self.assertEqual(Day.by_date_range_filled(self.db_conn,
+ 'today', 'today'),
+ [today])
def test_Day_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
def test_Day_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
- self.check_singularity('comment', 'boo')
+ self.check_singularity('day_comment', 'boo')
class TestsWithServer(TestCaseWithServer):
def test_do_POST_day(self) -> None:
"""Test POST /day."""
- form_data = {'comment': ''}
+ form_data = {'day_comment': ''}
self.check_post(form_data, '/day', 400)
self.check_post(form_data, '/day?date=foo', 400)
self.check_post(form_data, '/day?date=2024-01-01', 302)
class TestsSansDB(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = Process
-
- def test_Process_id_setting(self) -> None:
- """Test .id_ being set and its legal range being enforced."""
- self.check_id_setting()
-
- def test_Process_versioned_defaults(self) -> None:
- """Test defaults of VersionedAttributes."""
- self.check_versioned_defaults({
- 'title': 'UNNAMED',
- 'description': '',
- 'effort': 1.0})
+ do_id_test = True
+ versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '',
+ 'effort': 1.0}
class TestsSansDBProcessStep(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = ProcessStep
-
- def test_ProcessStep_id_setting(self) -> None:
- """Test .id_ being set and its legal range being enforced."""
- self.check_id_setting(2, 3, 4)
+ do_id_test = True
+ default_init_args = [2, 3, 4]
class TestsWithDB(TestCaseWithDB):
"""Module tests requiring DB setup."""
checked_class = Process
+ test_versioneds = {'title': str, 'description': str, 'effort': float}
def three_processes(self) -> tuple[Process, Process, Process]:
"""Return three saved processes."""
p.save(self.db_conn)
return p, set_1, set_2, set_3
- def test_Process_saving_and_caching(self) -> None:
+ def test_Process_conditions_saving(self) -> None:
"""Test .save/.save_core."""
- kwargs = {'id_': 1}
- self.check_saving_and_caching(**kwargs)
- self.check_saving_of_versioned('title', str)
- self.check_saving_of_versioned('description', str)
- self.check_saving_of_versioned('effort', float)
p, set1, set2, set3 = self.p_of_conditions()
p.uncache()
r = Process.by_id(self.db_conn, p.id_)
assert isinstance(p2.id_, int)
assert isinstance(p3.id_, int)
steps_p1: list[tuple[int | None, int, int | None]] = []
+ # add step of process p2 as first (top-level) step to p1
add_step(p1, steps_p1, (None, p2.id_, None), 1)
p1_dict: dict[int, ProcessStepsNode] = {}
p1_dict[1] = ProcessStepsNode(p2, None, True, {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p3 as second (top-level) step to p1
add_step(p1, steps_p1, (None, p3.id_, None), 2)
step_2 = p1.explicit_steps[-1]
p1_dict[2] = ProcessStepsNode(p3, None, True, {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p3 as first (top-level) step to p2,
+ # expect it as implicit sub-step of p1's second (p3) step
steps_p2: list[tuple[int | None, int, int | None]] = []
add_step(p2, steps_p2, (None, p3.id_, None), 3)
p1_dict[1].steps[3] = ProcessStepsNode(p3, None, False, {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p2 as explicit sub-step to p1's first sub-step
add_step(p1, steps_p1, (None, p2.id_, step_2.id_), 4)
step_3 = ProcessStepsNode(p3, None, False, {}, True)
p1_dict[2].steps[4] = ProcessStepsNode(p2, step_2.id_, True,
{3: step_3}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p3 as explicit sub-step to non-existing p1
+ # sub-step (of id=999), expect it to become another p1 top-level step
add_step(p1, steps_p1, (None, p3.id_, 999), 5)
p1_dict[5] = ProcessStepsNode(p3, None, True, {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
+ # add step of process p3 as explicit sub-step to p1's implicit p3
+ # sub-step, expect it to become another p1 top-level step
add_step(p1, steps_p1, (None, p3.id_, 3), 6)
p1_dict[6] = ProcessStepsNode(p3, None, True, {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
self.assertEqual(p1.used_as_step_by(self.db_conn), [])
self.assertEqual(p2.used_as_step_by(self.db_conn), [p1])
self.assertEqual(p3.used_as_step_by(self.db_conn), [p1, p2])
+ # add step of process p2 as explicit sub-step to p1's second (p3)
+ # top-level step
+ add_step(p1, steps_p1, (None, p3.id_, 2), 7)
+ p1_dict[2].steps[7] = ProcessStepsNode(p3, 2, True, {}, False)
+ # import pprint
+ # pprint.pp(p1.get_steps(self.db_conn, None))
+ # pprint.pp(p1_dict)
+ self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
def test_Process_conditions(self) -> None:
"""Test setting Process.conditions/enables/disables."""
class TestsWithDBForProcessStep(TestCaseWithDB):
"""Module tests requiring DB setup."""
checked_class = ProcessStep
-
- def test_ProcessStep_saving_and_caching(self) -> None:
- """Test .save/.save_core."""
- kwargs = {'id_': 1,
- 'owner_id': 2,
- 'step_process_id': 3,
- 'parent_step_id': 4}
- self.check_saving_and_caching(**kwargs)
+ default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
+ 'parent_step_id': 4}
def test_ProcessStep_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class from DB"""
self.check_post(form_data, '/process?id=6', 404)
self.check_post(form_data, '/process?id=5', 302, '/processes')
+ def test_do_POST_process_steps(self) -> None:
+ """Test behavior of ProcessStep posting."""
+ # pylint: disable=too-many-statements
+ form_data_1 = self.post_process(1)
+ self.post_process(2)
+ self.post_process(3)
+ # post first (top-level) step of process 2 to process 1
+ form_data_1['new_top_step'] = [2]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 2)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ # post empty steps list to process, expect clean slate, and old step to
+ # completely disappear
+ form_data_1['new_top_step'] = []
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(retrieved_process.explicit_steps, [])
+ with self.assertRaises(NotFoundException):
+ ProcessStep.by_id(self.db_conn, retrieved_step.id_)
+ # post new first (top_level) step of process 3 to process 1
+ form_data_1['new_top_step'] = [3]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 3)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ # post to process steps list without keeps, expect clean slate
+ form_data_1['new_top_step'] = []
+ form_data_1['steps'] = [retrieved_step.id_]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(retrieved_process.explicit_steps, [])
+ # post to process empty steps list but keep, expect 400
+ form_data_1['steps'] = []
+ form_data_1['keep_step'] = [retrieved_step.id_]
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ # post to process steps list with keep on non-created step, expect 400
+ form_data_1['steps'] = [retrieved_step.id_]
+ form_data_1['keep_step'] = [retrieved_step.id_]
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ # post to process steps list with keep and process ID, expect 200
+ form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 2)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ # post nonsensical new_top_step id and otherwise zero'd steps, expect
+ # 400 and preservation of previous state
+ form_data_1['new_top_step'] = ['foo']
+ form_data_1['steps'] = []
+ form_data_1['keep_step'] = []
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 1)
+ retrieved_step = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step.step_process_id, 2)
+ self.assertEqual(retrieved_step.owner_id, 1)
+ self.assertEqual(retrieved_step.parent_step_id, None)
+ # post to process steps list with keep and process ID, expect 200
+ form_data_1['new_top_step'] = [3]
+ form_data_1['steps'] = [retrieved_step.id_]
+ form_data_1['keep_step'] = [retrieved_step.id_]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 2)
+ retrieved_step_0 = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step_0.step_process_id, 2)
+ self.assertEqual(retrieved_step_0.owner_id, 1)
+ self.assertEqual(retrieved_step_0.parent_step_id, None)
+ retrieved_step_1 = retrieved_process.explicit_steps[1]
+ self.assertEqual(retrieved_step_1.step_process_id, 3)
+ self.assertEqual(retrieved_step_1.owner_id, 1)
+ self.assertEqual(retrieved_step_1.parent_step_id, None)
+ # post to process steps list with keeps etc., but trigger recursion
+ form_data_1['new_top_step'] = []
+ form_data_1['steps'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+ form_data_1['keep_step'] = [retrieved_step_0.id_, retrieved_step_1.id_]
+ form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [2]
+ form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [1]
+ self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
+ # check previous status preserved despite failed steps setting
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 2)
+ retrieved_step_0 = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step_0.step_process_id, 2)
+ self.assertEqual(retrieved_step_0.owner_id, 1)
+ self.assertEqual(retrieved_step_0.parent_step_id, None)
+ retrieved_step_1 = retrieved_process.explicit_steps[1]
+ self.assertEqual(retrieved_step_1.step_process_id, 3)
+ self.assertEqual(retrieved_step_1.owner_id, 1)
+ self.assertEqual(retrieved_step_1.parent_step_id, None)
+ form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [3]
+ # post sub-step to step
+ form_data_1[f'new_step_to_{retrieved_step_1.id_}'] = [3]
+ self.post_process(1, form_data_1)
+ retrieved_process = Process.by_id(self.db_conn, 1)
+ self.assertEqual(len(retrieved_process.explicit_steps), 3)
+ retrieved_step_0 = retrieved_process.explicit_steps[0]
+ self.assertEqual(retrieved_step_0.step_process_id, 2)
+ self.assertEqual(retrieved_step_0.owner_id, 1)
+ self.assertEqual(retrieved_step_0.parent_step_id, None)
+ retrieved_step_1 = retrieved_process.explicit_steps[1]
+ self.assertEqual(retrieved_step_1.step_process_id, 3)
+ self.assertEqual(retrieved_step_1.owner_id, 1)
+ self.assertEqual(retrieved_step_1.parent_step_id, None)
+ retrieved_step_2 = retrieved_process.explicit_steps[2]
+ self.assertEqual(retrieved_step_2.step_process_id, 3)
+ self.assertEqual(retrieved_step_2.owner_id, 1)
+ self.assertEqual(retrieved_step_2.parent_step_id, retrieved_step_1.id_)
+
def test_do_GET(self) -> None:
"""Test /process and /processes response codes."""
- self.post_process()
self.check_get_defaults('/process')
self.check_get('/processes', 200)
"""Test Todos module."""
from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.todos import Todo, TodoStepsNode
+from plomtask.todos import Todo, TodoNode
from plomtask.processes import Process
from plomtask.conditions import Condition
from plomtask.exceptions import (NotFoundException, BadFormatException,
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
+ checked_class = Todo
+ default_init_kwargs = {'process': None, 'is_done': False,
+ 'date': '2024-01-01'}
def setUp(self) -> None:
super().setUp()
self.cond1.save(self.db_conn)
self.cond2 = Condition(None)
self.cond2.save(self.db_conn)
+ self.default_init_kwargs['process'] = self.proc
- def test_Todo_by_id(self) -> None:
- """Test creation and findability of Todos."""
- process_unsaved = Process(None)
+ def test_Todo_init(self) -> None:
+ """Test creation of Todo and what they default to."""
+ process = Process(None)
with self.assertRaises(NotFoundException):
- todo = Todo(None, process_unsaved, False, self.date1)
- process_unsaved.save(self.db_conn)
- todo = Todo(None, process_unsaved, False, self.date1)
+ Todo(None, process, False, self.date1)
+ process.save(self.db_conn)
+ assert isinstance(self.cond1.id_, int)
+ assert isinstance(self.cond2.id_, int)
+ process.set_conditions(self.db_conn, [self.cond1.id_, self.cond2.id_])
+ process.set_enables(self.db_conn, [self.cond1.id_])
+ process.set_disables(self.db_conn, [self.cond2.id_])
+ todo_no_id = Todo(None, process, False, self.date1)
+ self.assertEqual(todo_no_id.conditions, [self.cond1, self.cond2])
+ self.assertEqual(todo_no_id.enables, [self.cond1])
+ self.assertEqual(todo_no_id.disables, [self.cond2])
+ todo_yes_id = Todo(5, process, False, self.date1)
+ self.assertEqual(todo_yes_id.conditions, [])
+ self.assertEqual(todo_yes_id.enables, [])
+ self.assertEqual(todo_yes_id.disables, [])
+
+ def test_Todo_by_id(self) -> None:
+ """Test findability of Todos."""
+ todo = Todo(1, self.proc, False, self.date1)
todo.save(self.db_conn)
self.assertEqual(Todo.by_id(self.db_conn, 1), todo)
with self.assertRaises(NotFoundException):
t2.save(self.db_conn)
self.assertEqual(Todo.by_date(self.db_conn, self.date1), [t1, t2])
self.assertEqual(Todo.by_date(self.db_conn, self.date2), [])
- self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
-
- def test_Todo_from_process(self) -> None:
- """Test spawning of Todo attributes from Process."""
- assert isinstance(self.cond1.id_, int)
- assert isinstance(self.cond2.id_, int)
- self.proc.set_conditions(self.db_conn, [self.cond1.id_])
- todo = Todo(None, self.proc, False, self.date1)
- self.assertEqual(todo.conditions, [self.cond1])
- todo.set_conditions(self.db_conn, [self.cond2.id_])
- self.assertEqual(todo.conditions, [self.cond2])
- self.assertEqual(self.proc.conditions, [self.cond1])
- self.proc.set_enables(self.db_conn, [self.cond1.id_])
- todo = Todo(None, self.proc, False, self.date1)
- self.assertEqual(todo.enables, [self.cond1])
- todo.set_enables(self.db_conn, [self.cond2.id_])
- self.assertEqual(todo.enables, [self.cond2])
- self.assertEqual(self.proc.enables, [self.cond1])
- self.proc.set_disables(self.db_conn, [self.cond1.id_])
- todo = Todo(None, self.proc, False, self.date1)
- self.assertEqual(todo.disables, [self.cond1])
- todo.set_disables(self.db_conn, [self.cond2.id_])
- self.assertEqual(todo.disables, [self.cond2])
- self.assertEqual(self.proc.disables, [self.cond1])
+ with self.assertRaises(BadFormatException):
+ self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
def test_Todo_on_conditions(self) -> None:
"""Test effect of Todos on Conditions."""
self.assertEqual(self.cond1.is_active, True)
self.assertEqual(self.cond2.is_active, False)
- def test_Todo_enablers_disablers(self) -> None:
- """Test Todo.enablers_for_at/disablers_for_at."""
- assert isinstance(self.cond1.id_, int)
- assert isinstance(self.cond2.id_, int)
- todo1 = Todo(None, self.proc, False, self.date1)
- todo1.save(self.db_conn)
- todo1.set_enables(self.db_conn, [self.cond1.id_])
- todo1.set_disables(self.db_conn, [self.cond2.id_])
- todo1.save(self.db_conn)
- todo2 = Todo(None, self.proc, False, self.date1)
- todo2.save(self.db_conn)
- todo2.set_enables(self.db_conn, [self.cond2.id_])
- todo2.save(self.db_conn)
- todo3 = Todo(None, self.proc, False, self.date2)
- todo3.save(self.db_conn)
- todo3.set_enables(self.db_conn, [self.cond2.id_])
- todo3.save(self.db_conn)
- enablers = Todo.enablers_for_at(self.db_conn, self.cond1, self.date1)
- self.assertEqual(enablers, [todo1])
- enablers = Todo.enablers_for_at(self.db_conn, self.cond1, self.date2)
- self.assertEqual(enablers, [])
- disablers = Todo.disablers_for_at(self.db_conn, self.cond1, self.date1)
- self.assertEqual(disablers, [])
- disablers = Todo.disablers_for_at(self.db_conn, self.cond1, self.date2)
- self.assertEqual(disablers, [])
- enablers = Todo.enablers_for_at(self.db_conn, self.cond2, self.date1)
- self.assertEqual(enablers, [todo2])
- enablers = Todo.enablers_for_at(self.db_conn, self.cond2, self.date2)
- self.assertEqual(enablers, [todo3])
- disablers = Todo.disablers_for_at(self.db_conn, self.cond2, self.date1)
- self.assertEqual(disablers, [todo1])
- disablers = Todo.disablers_for_at(self.db_conn, self.cond2, self.date2)
- self.assertEqual(disablers, [])
-
def test_Todo_children(self) -> None:
"""Test Todo.children relations."""
todo_1 = Todo(None, self.proc, False, self.date1)
def test_Todo_step_tree(self) -> None:
"""Test self-configuration of TodoStepsNode tree for Day view."""
- assert isinstance(self.cond1.id_, int)
- assert isinstance(self.cond2.id_, int)
todo_1 = Todo(None, self.proc, False, self.date1)
todo_1.save(self.db_conn)
assert isinstance(todo_1.id_, int)
# test minimum
- node_0 = TodoStepsNode(todo_1, True, [], False, False)
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
+ node_0 = TodoNode(todo_1, False, [])
+ self.assertEqual(todo_1.get_step_tree(set()), node_0)
# test non_emtpy seen_todo does something
node_0.seen = True
- self.assertEqual(todo_1.get_step_tree({todo_1.id_}, set()), node_0)
+ self.assertEqual(todo_1.get_step_tree({todo_1.id_}), node_0)
# test child shows up
todo_2 = Todo(None, self.proc, False, self.date1)
todo_2.save(self.db_conn)
assert isinstance(todo_2.id_, int)
todo_1.add_child(todo_2)
- node_2 = TodoStepsNode(todo_2, True, [], False, False)
+ node_2 = TodoNode(todo_2, False, [])
node_0.children = [node_2]
node_0.seen = False
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()), node_0)
# test child shows up with child
todo_3 = Todo(None, self.proc, False, self.date1)
todo_3.save(self.db_conn)
assert isinstance(todo_3.id_, int)
todo_2.add_child(todo_3)
- node_3 = TodoStepsNode(todo_3, True, [], False, False)
+ node_3 = TodoNode(todo_3, False, [])
node_2.children = [node_3]
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()), node_0)
# test same todo can be child-ed multiple times at different locations
todo_1.add_child(todo_3)
- node_4 = TodoStepsNode(todo_3, True, [], True, False)
+ node_4 = TodoNode(todo_3, True, [])
node_0.children += [node_4]
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
- # test condition shows up
- todo_1.set_conditions(self.db_conn, [self.cond1.id_])
- node_5 = TodoStepsNode(self.cond1, False, [], False, False)
- node_0.children += [node_5]
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
- # test second condition shows up
- todo_2.set_conditions(self.db_conn, [self.cond2.id_])
- node_6 = TodoStepsNode(self.cond2, False, [], False, False)
- node_2.children += [node_6]
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
- # test second condition is not hidden if fulfilled by non-sibling
- todo_1.set_enables(self.db_conn, [self.cond2.id_])
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
- # test second condition is hidden if fulfilled by sibling
- todo_3.set_enables(self.db_conn, [self.cond2.id_])
- node_2.children.remove(node_6)
- self.assertEqual(todo_1.get_step_tree(set(), set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()), node_0)
- def test_Todo_unsatisfied_steps(self) -> None:
- """Test options of satisfying unfulfilled Process.explicit_steps."""
+ def test_Todo_create_with_children(self) -> None:
+ """Test parenthood guaranteeds of Todo.create_with_children."""
assert isinstance(self.proc.id_, int)
- todo_1 = Todo(None, self.proc, False, self.date1)
- todo_1.save(self.db_conn)
proc2 = Process(None)
proc2.save(self.db_conn)
assert isinstance(proc2.id_, int)
proc4 = Process(None)
proc4.save(self.db_conn)
assert isinstance(proc4.id_, int)
+ # make proc4 step of proc3
proc3.set_steps(self.db_conn, [(None, proc4.id_, None)])
+ # give proc2 three steps; 2× proc1, 1× proc3
proc2.set_steps(self.db_conn, [(None, self.proc.id_, None),
(None, self.proc.id_, None),
(None, proc3.id_, None)])
- todo_2 = Todo(None, proc2, False, self.date1)
- todo_2.save(self.db_conn)
- # test empty adoption does nothing
- todo_2.adopt_from([])
- self.assertEqual(todo_2.children, [])
- # test basic adoption
- todo_2.adopt_from([todo_1])
- self.assertEqual(todo_2.children, [todo_1])
- self.assertEqual(todo_1.parents, [todo_2])
- # test making missing children
- todo_2.make_missing_children(self.db_conn)
- todo_3 = Todo.by_id(self.db_conn, 3)
- todo_4 = Todo.by_id(self.db_conn, 4)
- self.assertEqual(todo_2.children, [todo_1, todo_3, todo_4])
- self.assertEqual(todo_3.process, self.proc)
- self.assertEqual(todo_3.parents, [todo_2])
- self.assertEqual(todo_3.children, [])
- self.assertEqual(todo_4.process, proc3)
- self.assertEqual(todo_4.parents, [todo_2])
- # test .make_missing_children doesn't further than top-level
- self.assertEqual(todo_4.children, [])
- # test .make_missing_children lower down the tree
- todo_4.make_missing_children(self.db_conn)
- todo_5 = Todo.by_id(self.db_conn, 5)
- self.assertEqual(todo_5.process, proc4)
- self.assertEqual(todo_4.children, [todo_5])
- self.assertEqual(todo_5.parents, [todo_4])
+ # test mere creation does nothing
+ todo_ignore = Todo(None, proc2, False, self.date1)
+ todo_ignore.save(self.db_conn)
+ self.assertEqual(todo_ignore.children, [])
+ # test create_with_children on step-less does nothing
+ todo_1 = Todo.create_with_children(self.db_conn, self.proc.id_,
+ self.date1)
+ self.assertEqual(todo_1.children, [])
+ self.assertEqual(len(Todo.all(self.db_conn)), 2)
+ # test create_with_children adopts and creates, and down tree too
+ todo_2 = Todo.create_with_children(self.db_conn, proc2.id_, self.date1)
+ self.assertEqual(3, len(todo_2.children))
+ self.assertEqual(todo_1, todo_2.children[0])
+ self.assertEqual(self.proc, todo_2.children[1].process)
+ self.assertEqual(proc3, todo_2.children[2].process)
+ todo_3 = todo_2.children[2]
+ self.assertEqual(len(todo_3.children), 1)
+ self.assertEqual(todo_3.children[0].process, proc4)
def test_Todo_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
- todo = Todo(None, self.proc, False, self.date1)
- todo.save(self.db_conn)
- retrieved_todo = Todo.by_id(self.db_conn, 1)
- todo.is_done = True
- self.assertEqual(retrieved_todo.is_done, True)
- retrieved_todo = Todo.by_date(self.db_conn, self.date1)[0]
- retrieved_todo.is_done = False
- self.assertEqual(todo.is_done, False)
+ self.check_singularity('is_done', True, self.proc, False, self.date1)
def test_Todo_remove(self) -> None:
"""Test removal."""
Todo.by_id(self.db_conn, todo_1.id_)
self.assertEqual(todo_0.children, [])
self.assertEqual(todo_2.parents, [])
+ todo_2.comment = 'foo'
+ with self.assertRaises(HandledException):
+ todo_2.remove(self.db_conn)
+ todo_2.comment = ''
+ todo_2.effort = 5
+ with self.assertRaises(HandledException):
+ todo_2.remove(self.db_conn)
+
+ def test_Todo_autoremoval(self) -> None:
+ """"Test automatic removal for Todo.effort < 0."""
+ todo_1 = Todo(None, self.proc, False, self.date1)
+ todo_1.save(self.db_conn)
+ todo_1.comment = 'foo'
+ todo_1.effort = -0.1
+ todo_1.save(self.db_conn)
+ Todo.by_id(self.db_conn, todo_1.id_)
+ todo_1.comment = ''
+ todo_1.save(self.db_conn)
+ with self.assertRaises(NotFoundException):
+ Todo.by_id(self.db_conn, todo_1.id_)
class TestsWithServer(TestCaseWithServer):
self.post_process(2)
proc = Process.by_id(self.db_conn, 1)
proc2 = Process.by_id(self.db_conn, 2)
- form_data = {'comment': ''}
+ form_data = {'day_comment': ''}
self.check_post(form_data, '/day?date=2024-01-01', 302)
self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), [])
form_data['new_todo'] = str(proc.id_)
return Todo.by_date(self.db_conn, '2024-01-01')[0]
# test minimum
self.post_process()
- self.check_post({'comment': '', 'new_todo': 1},
+ self.check_post({'day_comment': '', 'new_todo': 1},
'/day?date=2024-01-01', 302)
# test posting to bad URLs
self.check_post({}, '/todo=', 404)
self.check_post({'adopt': 1}, '/todo?id=1', 400)
self.check_post({'adopt': 2}, '/todo?id=1', 404)
# test posting second todo of same process
- self.check_post({'comment': '', 'new_todo': 1},
+ self.check_post({'day_comment': '', 'new_todo': 1},
'/day?date=2024-01-01', 302)
# test todo 1 adopting todo 2
todo1 = post_and_reload({'adopt': 2})
"""Test Todos posted to Day view may adopt existing Todos."""
form_data = self.post_process()
form_data = self.post_process(2, form_data | {'new_top_step': 1})
- form_data = {'comment': '', 'new_todo': 1}
+ form_data = {'day_comment': '', 'new_todo': 1}
self.check_post(form_data, '/day?date=2024-01-01', 302)
form_data['new_todo'] = 2
self.check_post(form_data, '/day?date=2024-01-01', 302)
self.assertEqual(todo2.children, [todo1])
self.assertEqual(todo2.parents, [])
+ def test_do_POST_day_todo_multiple(self) -> None:
+ """Test multiple Todos can be posted to Day view."""
+ form_data = self.post_process()
+ form_data = self.post_process(2)
+ form_data = {'day_comment': '', 'new_todo': [1, 2]}
+ self.check_post(form_data, '/day?date=2024-01-01', 302)
+ todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
+ todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
+ self.assertEqual(todo1.process.id_, 1)
+ self.assertEqual(todo2.process.id_, 2)
+
+ def test_do_POST_day_todo_multiple_inner_adoption(self) -> None:
+ """Test multiple Todos can be posted to Day view w. inner adoption."""
+
+ def key_order_func(t: Todo) -> int:
+ assert isinstance(t.process.id_, int)
+ return t.process.id_
+
+ def check_adoption(date: str, new_todos: list[int]) -> None:
+ form_data = {'day_comment': '', 'new_todo': new_todos}
+ self.check_post(form_data, f'/day?date={date}', 302)
+ day_todos = Todo.by_date(self.db_conn, date)
+ day_todos.sort(key=key_order_func)
+ todo1 = day_todos[0]
+ todo2 = day_todos[1]
+ self.assertEqual(todo1.children, [])
+ self.assertEqual(todo1.parents, [todo2])
+ self.assertEqual(todo2.children, [todo1])
+ self.assertEqual(todo2.parents, [])
+
+ def check_nesting_adoption(process_id: int, date: str,
+ new_top_steps: list[int]) -> None:
+ form_data = self.post_process()
+ form_data = self.post_process(process_id,
+ form_data |
+ {'new_top_step': new_top_steps})
+ form_data = {'day_comment': '', 'new_todo': [process_id]}
+ self.check_post(form_data, f'/day?date={date}', 302)
+ day_todos = Todo.by_date(self.db_conn, date)
+ day_todos.sort(key=key_order_func, reverse=True)
+ self.assertEqual(len(day_todos), 3)
+ todo1 = day_todos[0] # process of process_id
+ todo2 = day_todos[1] # process 2
+ todo3 = day_todos[2] # process 1
+ self.assertEqual(sorted(todo1.children), sorted([todo2, todo3]))
+ self.assertEqual(todo1.parents, [])
+ self.assertEqual(todo2.children, [todo3])
+ self.assertEqual(todo2.parents, [todo1])
+ self.assertEqual(todo3.children, [])
+ self.assertEqual(sorted(todo3.parents), sorted([todo2, todo1]))
+
+ form_data = self.post_process()
+ form_data = self.post_process(2, form_data | {'new_top_step': 1})
+ check_adoption('2024-01-01', [1, 2])
+ check_adoption('2024-01-02', [2, 1])
+ check_nesting_adoption(3, '2024-01-03', [1, 2])
+ check_nesting_adoption(4, '2024-01-04', [2, 1])
+
+ def test_do_POST_day_todo_doneness(self) -> None:
+ """Test Todo doneness can be posted to Day view."""
+ form_data = self.post_process()
+ form_data = {'day_comment': '', 'new_todo': [1]}
+ self.check_post(form_data, '/day?date=2024-01-01', 302)
+ todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
+ form_data = {'day_comment': '', 'todo_id': [1]}
+ self.check_post(form_data, '/day?date=2024-01-01', 302)
+ self.assertEqual(todo.is_done, False)
+ form_data = {'day_comment': '', 'todo_id': [1], 'done': [1]}
+ self.check_post(form_data, '/day?date=2024-01-01', 302)
+ self.assertEqual(todo.is_done, True)
+
def test_do_GET_todo(self) -> None:
"""Test GET /todo response codes."""
self.post_process()
- form_data = {'comment': '', 'new_todo': 1}
+ form_data = {'day_comment': '', 'new_todo': 1}
self.check_post(form_data, '/day?date=2024-01-01', 302)
self.check_get('/todo', 400)
self.check_get('/todo?id=', 400)
class TestCaseSansDB(TestCase):
"""Tests requiring no DB setup."""
checked_class: Any
+ do_id_test: bool = False
+ default_init_args: list[Any] = []
+ versioned_defaults_to_test: dict[str, str | float] = {}
- def check_id_setting(self, *args: Any) -> None:
+ def test_id_setting(self) -> None:
"""Test .id_ being set and its legal range being enforced."""
+ if not self.do_id_test:
+ return
with self.assertRaises(HandledException):
- self.checked_class(0, *args)
- obj = self.checked_class(5, *args)
+ self.checked_class(0, *self.default_init_args)
+ obj = self.checked_class(5, *self.default_init_args)
self.assertEqual(obj.id_, 5)
- def check_versioned_defaults(self, attrs: dict[str, Any]) -> None:
+ def test_versioned_defaults(self) -> None:
"""Test defaults of VersionedAttributes."""
- obj = self.checked_class(None)
- for k, v in attrs.items():
+ if len(self.versioned_defaults_to_test) == 0:
+ return
+ obj = self.checked_class(1, *self.default_init_args)
+ for k, v in self.versioned_defaults_to_test.items():
self.assertEqual(getattr(obj, k).newest, v)
"""Module tests not requiring DB setup."""
checked_class: Any
default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
+ default_init_kwargs: dict[str, Any] = {}
+ test_versioneds: dict[str, type] = {}
def setUp(self) -> None:
Condition.empty_cache()
ProcessStep.empty_cache()
Todo.empty_cache()
timestamp = datetime.now().timestamp()
- self.db_file = DatabaseFile(f'test_db:{timestamp}')
- self.db_file.remake()
+ self.db_file = DatabaseFile.create_at(f'test_db:{timestamp}')
self.db_conn = DatabaseConnection(self.db_file)
def tearDown(self) -> None:
self.db_conn.close()
remove_file(self.db_file.path)
+ def test_saving_and_caching(self) -> None:
+ """Test storage and initialization of instances and attributes."""
+ if not hasattr(self, 'checked_class'):
+ return
+ self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
+ obj = self.checked_class(None, **self.default_init_kwargs)
+ obj.save(self.db_conn)
+ self.assertEqual(obj.id_, 2)
+ for k, v in self.test_versioneds.items():
+ self.check_saving_of_versioned(k, v)
+
def check_storage(self, content: list[Any]) -> None:
"""Test cache and DB equal content."""
expected_cache = {}
row)]
self.assertEqual(sorted(content), sorted(db_found))
- def check_saving_and_caching(self, **kwargs: Any) -> Any:
+ def check_saving_and_caching(self, **kwargs: Any) -> None:
"""Test instance.save in its core without relations."""
obj = self.checked_class(**kwargs) # pylint: disable=not-callable
# check object init itself doesn't store anything yet
"""POST basic Process."""
if not form_data:
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, '/process?id=', 302, f'/process?id={id_}')
+ self.check_post(form_data, f'/process?id={id_}', 302,
+ f'/process?id={id_}')
return form_data