from plomtask.exceptions import HandledException
-class Condition(BaseModel[int]):
+class Condition(BaseModel):
"""Non-Process dependency for ProcessSteps and Todos."""
table_name = 'conditions'
to_save_simples = ['is_active']
"""Collecting Day and date-related items."""
from __future__ import annotations
-from typing import Any
+from typing import Any, Self
from sqlite3 import Row
from datetime import datetime, timedelta
-from plomtask.db import DatabaseConnection, BaseModel
+from plomtask.db import DatabaseConnection, BaseModel, BaseModelId
from plomtask.todos import Todo
from plomtask.dating import (DATE_FORMAT, valid_date)
-class Day(BaseModel[str]):
+class Day(BaseModel):
"""Individual days defined by their dates."""
table_name = 'days'
to_save_simples = ['comment']
self.comment = comment
self.todos: list[Todo] = []
- def __lt__(self, other: Day) -> bool:
+ def __lt__(self, other: Self) -> bool:
return self.date < other.date
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection, row: Row | list[Any]
- ) -> Day:
+ ) -> Self:
"""Make from DB row, with linked Todos."""
day = super().from_table_row(db_conn, row)
assert isinstance(day.id_, str)
return day
@classmethod
- def by_id(cls, db_conn: DatabaseConnection, id_: str) -> Day:
+ def by_id(cls, db_conn: DatabaseConnection, id_: BaseModelId) -> Self:
"""Extend BaseModel.by_id
Checks Todo.days_to_update if we need to a retrieved Day's .todos,
and also ensures we're looking for proper dates and not strings like
"yesterday" by enforcing the valid_date translation.
"""
+ assert isinstance(id_, str)
possibly_translated_date = valid_date(id_)
day = super().by_id(db_conn, possibly_translated_date)
if day.id_ in Todo.days_to_update:
+ assert isinstance(day.id_, str)
Todo.days_to_update.remove(day.id_)
day.todos = Todo.by_date(db_conn, day.id_)
return day
@classmethod
- def with_filled_gaps(cls, days: list[Day], start_date: str, end_date: str
- ) -> list[Day]:
+ def with_filled_gaps(cls, days: list[Self], start_date: str, end_date: str
+ ) -> list[Self]:
"""In days, fill with (un-stored) Days gaps between start/end_date."""
days = days[:]
start_date, end_date = valid_date(start_date), valid_date(end_date)
days = [d for d in days if d.date >= start_date and d.date <= end_date]
days.sort()
if start_date not in [d.date for d in days]:
- days[:] = [Day(start_date)] + days
+ days[:] = [cls(start_date)] + days
if end_date not in [d.date for d in days]:
- days += [Day(end_date)]
+ days += [cls(end_date)]
if len(days) > 1:
gapless_days = []
for i, day in enumerate(days):
gapless_days += [day]
if i < len(days) - 1:
while day.next_date != days[i+1].date:
- day = Day(day.next_date)
+ day = cls(day.next_date)
gapless_days += [day]
days[:] = gapless_days
return days
from os.path import isfile
from difflib import Differ
from sqlite3 import connect as sql_connect, Cursor, Row
-from typing import Any, Self, TypeVar, Generic, Callable
+from typing import Any, Self, Callable
from plomtask.exceptions import (HandledException, NotFoundException,
BadFormatException)
from plomtask.dating import valid_date
self.exec(f'DELETE FROM {table_name} WHERE {key} =', (target,))
-BaseModelId = TypeVar('BaseModelId', int, str)
-BaseModelInstance = TypeVar('BaseModelInstance', bound='BaseModel[Any]')
+BaseModelId = int | str
-class BaseModel(Generic[BaseModelId]):
+class BaseModel:
"""Template for most of the models we use/derive from the DB."""
table_name = ''
to_save_simples: list[str] = []
return list(cls.versioned_defaults.keys())
@property
- def as_dict_and_refs(self) -> tuple[dict[str, object],
- list[BaseModel[int] | BaseModel[str]]]:
+ def as_dict_and_refs(self) -> tuple[dict[str, object], list[BaseModel]]:
"""Return self as json.dumps-ready dict, list of referenced objects."""
d: dict[str, object] = {'id': self.id_}
- refs: list[BaseModel[int] | BaseModel[str]] = []
+ refs: list[BaseModel] = []
for to_save in self.to_save_simples:
d[to_save] = getattr(self, to_save)
if len(self.to_save_versioned()) > 0:
cls.cache_ = {}
@classmethod
- def get_cache(cls: type[BaseModelInstance]
- ) -> dict[Any, BaseModelInstance]:
+ def get_cache(cls) -> dict[BaseModelId, Self]:
"""Get cache dictionary, create it if not yet existing."""
if not hasattr(cls, 'cache_'):
- d: dict[Any, BaseModelInstance] = {}
+ d: dict[BaseModelId, BaseModel] = {}
cls.cache_ = d
return cls.cache_
@classmethod
- def _get_cached(cls: type[BaseModelInstance],
- id_: BaseModelId
- ) -> BaseModelInstance | None:
+ def _get_cached(cls, id_: BaseModelId) -> Self | None:
"""Get object of id_ from class's cache, or None if not found."""
cache = cls.get_cache()
if id_ in cache:
# object retrieval and generation
@classmethod
- def from_table_row(cls: type[BaseModelInstance],
- # pylint: disable=unused-argument
+ def from_table_row(cls,
db_conn: DatabaseConnection,
- row: Row | list[Any]) -> BaseModelInstance:
+ row: Row | list[Any]) -> Self:
"""Make from DB row (sans relations), update DB cache with it."""
obj = cls(*row)
assert obj.id_ is not None
return cls(id_)
@classmethod
- def all(cls: type[BaseModelInstance],
- db_conn: DatabaseConnection) -> list[BaseModelInstance]:
+ def all(cls, db_conn: DatabaseConnection) -> list[Self]:
"""Collect all objects of class into list.
Note that this primarily returns the contents of the cache, and only
cache is always instantly cleaned of any items that would be removed
from the DB.
"""
- items: dict[BaseModelId, BaseModelInstance] = {}
+ items: dict[BaseModelId, Self] = {}
for k, v in cls.get_cache().items():
items[k] = v
already_recorded = items.keys()
return sorted(list(items.values()))
@classmethod
- def by_date_range_with_limits(cls: type[BaseModelInstance],
+ def by_date_range_with_limits(cls,
db_conn: DatabaseConnection,
date_range: tuple[str, str],
date_col: str = 'day'
- ) -> tuple[list[BaseModelInstance], str,
- str]:
+ ) -> tuple[list[Self], str, str]:
"""Return list of items in DB within (closed) date_range interval.
If no range values provided, defaults them to 'yesterday' and
return items, start_date, end_date
@classmethod
- def matching(cls: type[BaseModelInstance], db_conn: DatabaseConnection,
- pattern: str) -> list[BaseModelInstance]:
+ def matching(cls, db_conn: DatabaseConnection, pattern: str) -> list[Self]:
"""Return all objects whose .to_search match pattern."""
items = cls.all(db_conn)
if pattern:
table_name = self.table_name
cursor = db_conn.exec(f'REPLACE INTO {table_name} VALUES', values)
if not isinstance(self.id_, str):
- self.id_ = cursor.lastrowid # type: ignore[assignment]
+ self.id_ = cursor.lastrowid
self.cache()
for attr_name in self.to_save_versioned():
getattr(self, attr_name).save(db_conn)
def flatten(node: object) -> object:
- def update_library_with(
- item: BaseModel[int] | BaseModel[str]) -> None:
+ def update_library_with(item: BaseModel) -> None:
cls_name = item.__class__.__name__
if cls_name not in library:
library[cls_name] = {}
for process_id in owned_ids:
Process.by_id(self._conn, process_id) # to ensure ID exists
preset_top_step = process_id
+ assert not isinstance(process.id_, str)
return {'process': process,
'is_new': not exists,
'preset_top_step': preset_top_step,
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
- assert todo.id_ is not None
+ assert isinstance(todo.id_, int)
adoptees = [(id_, todo.id_) for id_ in self._form.get_all_int('adopt')]
to_make = {'full': [(id_, todo.id_)
for id_ in self._form.get_all_int('make_full')],
"""Collecting Processes and Process-related items."""
from __future__ import annotations
-from typing import Set, Any
+from typing import Set, Self, Any
from sqlite3 import Row
from plomtask.misc import DictableNode
from plomtask.db import DatabaseConnection, BaseModel
'is_suppressed']
-class Process(BaseModel[int], ConditionsRelations):
+class Process(BaseModel, ConditionsRelations):
"""Template for, and metadata for, Todos, and their arrangements."""
# pylint: disable=too-many-instance-attributes
table_name = 'processes'
'title': lambda p: p.title.newest}
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
- BaseModel.__init__(self, id_)
+ super().__init__(id_)
ConditionsRelations.__init__(self)
for name in ['title', 'description', 'effort']:
attr = VersionedAttribute(self, f'process_{name}s',
self.n_owners: int | None = None # only set by from_table_row
@classmethod
- def from_table_row(cls, db_conn: DatabaseConnection,
- row: Row | list[Any]) -> Process:
+ def from_table_row(cls, db_conn: DatabaseConnection, row: Row | list[Any]
+ ) -> Self:
"""Make from DB row, with dependencies."""
process = super().from_table_row(db_conn, row)
assert process.id_ is not None
process.n_owners = len(process.used_as_step_by(db_conn))
return process
- def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Process]:
+ def used_as_step_by(self, db_conn: DatabaseConnection) -> list[Self]:
"""Return Processes using self for a ProcessStep."""
if not self.id_:
return []
def get_steps(self,
db_conn: DatabaseConnection,
- external_owner: Process | None = None
+ external_owner: Self | None = None
) -> list[ProcessStepsNode]:
"""Return tree of depended-on explicit and implicit ProcessSteps."""
owners_old = self.used_as_step_by(db_conn)
losers = [o for o in owners_old if o.id_ not in owner_ids]
owners_old_ids = [o.id_ for o in owners_old]
- winners = [Process.by_id(db_conn, id_) for id_ in owner_ids
+ winners = [self.by_id(db_conn, id_) for id_ in owner_ids
if id_ not in owners_old_ids]
steps_to_remove = []
for loser in losers:
super().remove(db_conn)
-class ProcessStep(BaseModel[int]):
+class ProcessStep(BaseModel):
"""Sub-unit of Processes."""
table_name = 'process_steps'
to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
"""Actionables."""
from __future__ import annotations
-from typing import Any, Set
+from typing import Any, Self, Set
from sqlite3 import Row
from plomtask.misc import DictableNode
from plomtask.db import DatabaseConnection, BaseModel
_to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
-class Todo(BaseModel[int], ConditionsRelations):
+class Todo(BaseModel, ConditionsRelations):
"""Individual actionable."""
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
date: str, comment: str = '',
effort: None | float = None,
calendarize: bool = False) -> None:
- BaseModel.__init__(self, id_)
+ super().__init__(id_)
ConditionsRelations.__init__(self)
if process.id_ is None:
raise NotFoundException('Process of Todo without ID (not saved?)')
@classmethod
def by_date_range(cls, db_conn: DatabaseConnection,
- date_range: tuple[str, str] = ('', '')) -> list[Todo]:
+ date_range: tuple[str, str] = ('', '')) -> list[Self]:
"""Collect Todos of Days within date_range."""
todos, _, _ = cls.by_date_range_with_limits(db_conn, date_range)
return todos
def ensure_children(self, db_conn: DatabaseConnection) -> None:
"""Ensure Todo children (create or adopt) demanded by Process chain."""
- def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
- adoptables = [t for t in Todo.by_date(db_conn, parent.date)
+ def walk_steps(parent: Self, step_node: ProcessStepsNode) -> Todo:
+ adoptables = [t for t in self.by_date(db_conn, parent.date)
if (t not in parent.children)
and (t != parent)
and step_node.process.id_ == t.process_id]
satisfier = adoptable
break
if not satisfier:
- satisfier = Todo(None, step_node.process, False, parent.date)
+ satisfier = self.__class__(None, step_node.process, False,
+ parent.date)
satisfier.save(db_conn)
sub_step_nodes = sorted(
step_node.steps,
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
- row: Row | list[Any]) -> Todo:
+ row: Row | list[Any]) -> Self:
"""Make from DB row, with dependencies."""
if row[1] == 0:
raise NotFoundException('calling Todo of '
@classmethod
def by_process_id(cls, db_conn: DatabaseConnection,
- process_id: int | None) -> list[Todo]:
+ process_id: int | None) -> list[Self]:
"""Collect all Todos of Process of process_id."""
return [t for t in cls.all(db_conn) if t.process.id_ == process_id]
@classmethod
- def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Todo]:
+ def by_date(cls, db_conn: DatabaseConnection, date: str) -> list[Self]:
"""Collect all Todos for Day of date."""
return cls.by_date_range(db_conn, (date, date))
def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
"""Return tree of depended-on Todos."""
- def make_node(todo: Todo) -> TodoNode:
+ def make_node(todo: Self) -> TodoNode:
children = []
seen = todo.id_ in seen_todos
assert isinstance(todo.id_, int)
def tree_effort(self) -> float:
"""Return sum of performed efforts of self and all descendants."""
- def walk_tree(node: Todo) -> float:
+ def walk_tree(node: Self) -> float:
local_effort = 0.0
for child in node.children:
local_effort += walk_tree(child)
return walk_tree(self)
- def add_child(self, child: Todo) -> None:
+ def add_child(self, child: Self) -> None:
"""Add child to self.children, avoid recursion, update parenthoods."""
- def walk_steps(node: Todo) -> None:
+ def walk_steps(node: Self) -> None:
if node.id_ == self.id_:
raise BadFormatException('bad child choice causes recursion')
for child in node.children:
self.children += [child]
child.parents += [self]
- def remove_child(self, child: Todo) -> None:
+ def remove_child(self, child: Self) -> None:
"""Remove child from self.children, update counter relations."""
if child not in self.children:
raise HandledException('Cannot remove un-parented child.')