from plomtask.days import Day, todays_date
from plomtask.misc import HandledException
from plomtask.db import DatabaseConnection
+from plomtask.processes import Process
TEMPLATES_DIR = 'templates'
elif 'day' == site:
date = params.get('date', [todays_date()])[0]
html = self.do_GET_day(conn, date)
+ elif 'process' == site:
+ id_ = params.get('id', [None])[0]
+ try:
+ id_ = int(id_) if id_ else None
+ except ValueError as e:
+ raise HandledException(f'Bad ?id= value: {id_}') from e
+ html = self.do_GET_process(conn, id_)
+ elif 'processes' == site:
+ html = self.do_GET_processes(conn)
else:
raise HandledException('Test!')
conn.commit()
day = Day.by_date(conn, date, create=True)
return self.server.jinja.get_template('day.html').render(day=day)
+ def do_GET_process(self, conn: DatabaseConnection, id_: int | None):
+ """Show process of id_."""
+ return self.server.jinja.get_template('process.html').render(
+ process=Process.by_id(conn, id_, create=True))
+
+ def do_GET_processes(self, conn: DatabaseConnection):
+ """Show all Processes."""
+ return self.server.jinja.get_template('processes.html').render(
+ processes=Process.all(conn))
+
def do_POST(self):
"""Handle any POST request."""
try:
if 'day' == site:
date = params.get('date', [None])[0]
self.do_POST_day(conn, date, postvars)
+ elif 'process' == site:
+ id_ = params.get('id', [None])[0]
+ try:
+ id_ = int(id_) if id_ else None
+ except ValueError as e:
+ raise HandledException(f'Bad ?id= value: {id_}') from e
+ self.do_POST_process(conn, id_, postvars)
conn.commit()
conn.close()
self._redirect('/')
day.comment = postvars['comment'][0]
day.save(conn)
+ def do_POST_process(self, conn: DatabaseConnection, id_: int | None,
+ postvars: dict):
+ """Update or insert Process of id_ and fields defined in postvars."""
+ process = Process.by_id(conn, id_, create=True)
+ if process:
+ process.title.set(postvars['title'][0])
+ process.description.set(postvars['description'][0])
+ effort = postvars['effort'][0]
+ try:
+ process.effort.set(float(effort))
+ except ValueError as e:
+ raise HandledException(f'Bad effort value: {effort}') from e
+ process.save(conn)
+
def _init_handling(self):
conn = DatabaseConnection(self.server.db)
parsed_url = urlparse(self.path)
--- /dev/null
+"""Collecting Processes and Process-related items."""
+from __future__ import annotations
+from sqlite3 import Row
+from datetime import datetime
+from plomtask.db import DatabaseConnection
+
+
+class Process:
+ """Template for, and metadata for, Todos, and their arrangements."""
+
+ def __init__(self, id_: int | None) -> None:
+ self.id_ = id_ if id_ != 0 else None # to avoid DB-confusing rowid=0
+ self.title = VersionedAttribute(self, 'title', 'UNNAMED')
+ self.description = VersionedAttribute(self, 'description', '')
+ self.effort = VersionedAttribute(self, 'effort', 1.0)
+
+ @classmethod
+ def from_table_row(cls, row: Row) -> Process:
+ """Make Process from database row, with empty VersionedAttributes."""
+ return cls(row[0])
+
+ @classmethod
+ def all(cls, db_conn: DatabaseConnection) -> list[Process]:
+ """Collect all Processes and their connected VersionedAttributes."""
+ processes = {}
+ for row in db_conn.exec('SELECT * FROM processes'):
+ process = cls.from_table_row(row)
+ processes[process.id_] = process
+ for row in db_conn.exec('SELECT * FROM process_titles'):
+ processes[row[0]].title.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_descriptions'):
+ processes[row[0]].description.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_efforts'):
+ processes[row[0]].effort.history[row[1]] = row[2]
+ return list(processes.values())
+
+ @classmethod
+ def by_id(cls, db_conn: DatabaseConnection,
+ id_: int | None, create: bool = False) -> Process | None:
+ """Collect all Processes and their connected VersionedAttributes."""
+ process = None
+ for row in db_conn.exec('SELECT * FROM processes '
+ 'WHERE id = ?', (id_,)):
+ process = cls(row[0])
+ break
+ if create and not process:
+ process = Process(id_)
+ if process:
+ for row in db_conn.exec('SELECT * FROM process_titles '
+ 'WHERE process_id = ?', (process.id_,)):
+ process.title.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_descriptions '
+ 'WHERE process_id = ?', (process.id_,)):
+ process.description.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_efforts '
+ 'WHERE process_id = ?', (process.id_,)):
+ process.effort.history[row[1]] = row[2]
+ return process
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Add (or re-write) self and connected VersionedAttributes to DB."""
+ cursor = db_conn.exec('REPLACE INTO processes VALUES (?)', (self.id_,))
+ self.id_ = cursor.lastrowid
+ self.title.save(db_conn)
+ self.description.save(db_conn)
+ self.effort.save(db_conn)
+
+
+class VersionedAttribute:
+ """Attributes whose values are recorded as a timestamped history."""
+
+ def __init__(self,
+ parent: Process, name: str, default: str | float) -> None:
+ self.parent = parent
+ self.name = name
+ self.default = default
+ self.history: dict[str, str | float] = {}
+
+ @property
+ def _newest_timestamp(self) -> str:
+ """Return most recent timestamp."""
+ return sorted(self.history.keys())[-1]
+
+ @property
+ def newest(self) -> str | float:
+ """Return most recent value, or self.default if self.history empty."""
+ if 0 == len(self.history):
+ return self.default
+ return self.history[self._newest_timestamp]
+
+ def set(self, value: str | float) -> None:
+ """Add to self.history if and only if not same value as newest one."""
+ if 0 == len(self.history) \
+ or value != self.history[self._newest_timestamp]:
+ self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
+
+ def at(self, queried_time: str) -> str | float:
+ """Retrieve value of timestamp nearest queried_time from the past."""
+ sorted_timestamps = sorted(self.history.keys())
+ if 0 == len(sorted_timestamps):
+ return self.default
+ selected_timestamp = sorted_timestamps[0]
+ for timestamp in sorted_timestamps[1:]:
+ if timestamp > queried_time:
+ break
+ selected_timestamp = timestamp
+ return self.history[selected_timestamp]
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Save as self.history entries, but first wipe old ones."""
+ db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
+ (self.parent.id_,))
+ for timestamp, value in self.history.items():
+ db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
+ (self.parent.id_, timestamp, value))
date TEXT PRIMARY KEY,
comment TEXT NOT NULL
);
+CREATE TABLE process_descriptions (
+ process_id INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (process_id, timestamp),
+ FOREIGN KEY (process_id) REFERENCES processes(id)
+);
+CREATE TABLE process_efforts (
+ process_id INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ effort REAL NOT NULL,
+ PRIMARY KEY (process_id, timestamp),
+ FOREIGN KEY (process_id) REFERENCES processes(id)
+);
+CREATE TABLE process_titles (
+ process_id INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (process_id, timestamp),
+ FOREIGN KEY (process_id) REFERENCES processes(id)
+);
+CREATE TABLE processes (
+ id INTEGER PRIMARY KEY
+);