class Condition(BaseModel[int]):
"""Non-Process dependency for ProcessSteps and Todos."""
table_name = 'conditions'
- to_save = ['is_active']
- to_save_versioned = ['title', 'description']
+ to_save_simples = ['is_active']
+ versioned_defaults = {'title': 'UNNAMED', 'description': ''}
to_search = ['title.newest', 'description.newest']
can_create_by_id = True
sorters = {'is_active': lambda c: c.is_active,
def __init__(self, id_: int | None, is_active: bool = False) -> None:
super().__init__(id_)
self.is_active = is_active
- self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
- self.description = VersionedAttribute(self, 'condition_descriptions',
- '')
+ for name in ['title', 'description']:
+ attr = VersionedAttribute(self, f'condition_{name}s',
+ self.versioned_defaults[name])
+ setattr(self, name, attr)
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, with VersionedAttributes.
class ConditionsRelations:
"""Methods for handling relations to Conditions, for Todo and Process."""
+ # pylint: disable=too-few-public-methods
def __init__(self) -> None:
self.conditions: list[Condition] = []
self.enables: list[Condition] = []
self.disables: list[Condition] = []
- def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
- target: str = 'conditions') -> None:
- """Set self.[target] to Conditions identified by ids."""
- target_list = getattr(self, target)
- while len(target_list) > 0:
- target_list.pop()
- for id_ in ids:
- target_list += [Condition.by_id(db_conn, id_)]
-
- def set_blockers(self, db_conn: DatabaseConnection,
- ids: list[int]) -> None:
- """Set self.enables to Conditions identified by ids."""
- self.set_conditions(db_conn, ids, 'blockers')
-
- def set_enables(self, db_conn: DatabaseConnection,
- ids: list[int]) -> None:
- """Set self.enables to Conditions identified by ids."""
- self.set_conditions(db_conn, ids, 'enables')
-
- def set_disables(self, db_conn: DatabaseConnection,
- ids: list[int]) -> None:
- """Set self.disables to Conditions identified by ids."""
- self.set_conditions(db_conn, ids, 'disables')
+ def set_condition_relations(self,
+ db_conn: DatabaseConnection,
+ ids_conditions: list[int],
+ ids_blockers: list[int],
+ ids_enables: list[int],
+ ids_disables: list[int]
+ ) -> None:
+ """Set owned Condition lists to those identified by respective IDs."""
+ # pylint: disable=too-many-arguments
+ for ids, target in [(ids_conditions, 'conditions'),
+ (ids_blockers, 'blockers'),
+ (ids_enables, 'enables'),
+ (ids_disables, 'disables')]:
+ target_list = getattr(self, target)
+ while len(target_list) > 0:
+ target_list.pop()
+ for id_ in ids:
+ target_list += [Condition.by_id(db_conn, id_)]
class Day(BaseModel[str]):
"""Individual days defined by their dates."""
table_name = 'days'
- to_save = ['comment']
+ to_save_simples = ['comment']
add_to_dict = ['todos']
can_create_by_id = True
day.todos = Todo.by_date(db_conn, day.id_)
return day
- @classmethod
- def by_date_range_filled(cls, db_conn: DatabaseConnection,
- start: str, end: str) -> list[Day]:
- """Return days existing and non-existing between dates start/end."""
- ret = cls.by_date_range_with_limits(db_conn, (start, end), 'id')
- days, start_date, end_date = ret
- return cls.with_filled_gaps(days, start_date, end_date)
-
@classmethod
def with_filled_gaps(cls, days: list[Day], start_date: str, end_date: str
) -> list[Day]:
- """In days, fill with (un-saved) Days gaps between start/end_date."""
+ """In days, fill with (un-stored) Days gaps between start/end_date."""
+ days = days[:]
+ start_date, end_date = valid_date(start_date), valid_date(end_date)
if start_date > end_date:
- return days
+ return []
+ days = [d for d in days if d.date >= start_date and d.date <= end_date]
days.sort()
if start_date not in [d.date for d in days]:
days[:] = [Day(start_date)] + days
class BaseModel(Generic[BaseModelId]):
"""Template for most of the models we use/derive from the DB."""
table_name = ''
- to_save: list[str] = []
- to_save_versioned: list[str] = []
+ to_save_simples: list[str] = []
to_save_relations: list[tuple[str, str, str, int]] = []
+ versioned_defaults: dict[str, str | float] = {}
add_to_dict: list[str] = []
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
self.id_ = id_
def __hash__(self) -> int:
- hashable = [self.id_] + [getattr(self, name) for name in self.to_save]
+ hashable = [self.id_] + [getattr(self, name)
+ for name in self.to_save_simples]
for definition in self.to_save_relations:
attr = getattr(self, definition[2])
hashable += [tuple(rel.id_ for rel in attr)]
- for name in self.to_save_versioned:
+ for name in self.to_save_versioned():
hashable += [hash(getattr(self, name))]
return hash(tuple(hashable))
assert isinstance(other.id_, int)
return self.id_ < other.id_
+ @classmethod
+ def to_save_versioned(cls) -> list[str]:
+ """Return keys of cls.versioned_defaults assuming we wanna save 'em."""
+ return list(cls.versioned_defaults.keys())
+
@property
- def as_dict(self) -> dict[str, object]:
- """Return self as (json.dumps-compatible) dict."""
- library: dict[str, dict[str | int, object]] = {}
- d: dict[str, object] = {'id': self.id_, '_library': library}
- for to_save in self.to_save:
- attr = getattr(self, to_save)
- if hasattr(attr, 'as_dict_into_reference'):
- d[to_save] = attr.as_dict_into_reference(library)
- else:
- d[to_save] = attr
- if len(self.to_save_versioned) > 0:
+ def as_dict_and_refs(self) -> tuple[dict[str, object],
+ list[BaseModel[int] | BaseModel[str]]]:
+ """Return self as json.dumps-ready dict, list of referenced objects."""
+ d: dict[str, object] = {'id': self.id_}
+ refs: list[BaseModel[int] | BaseModel[str]] = []
+ for to_save in self.to_save_simples:
+ d[to_save] = getattr(self, to_save)
+ if len(self.to_save_versioned()) > 0:
d['_versioned'] = {}
- for k in self.to_save_versioned:
+ for k in self.to_save_versioned():
attr = getattr(self, k)
assert isinstance(d['_versioned'], dict)
d['_versioned'][k] = attr.history
- for r in self.to_save_relations:
- attr_name = r[2]
- l: list[int | str] = []
- for rel in getattr(self, attr_name):
- l += [rel.as_dict_into_reference(library)]
- d[attr_name] = l
- for k in self.add_to_dict:
- d[k] = [x.as_dict_into_reference(library)
- for x in getattr(self, k)]
- return d
-
- def as_dict_into_reference(self,
- library: dict[str, dict[str | int, object]]
- ) -> int | str:
- """Return self.id_ while writing .as_dict into library."""
- def into_library(library: dict[str, dict[str | int, object]],
- cls_name: str,
- id_: str | int,
- d: dict[str, object]
- ) -> None:
- if cls_name not in library:
- library[cls_name] = {}
- if id_ in library[cls_name]:
- if library[cls_name][id_] != d:
- msg = 'Unexpected inequality of entries for ' +\
- f'_library at: {cls_name}/{id_}'
- raise HandledException(msg)
- else:
- library[cls_name][id_] = d
- as_dict = self.as_dict
- assert isinstance(as_dict['_library'], dict)
- for cls_name, dict_of_objs in as_dict['_library'].items():
- for id_, obj in dict_of_objs.items():
- into_library(library, cls_name, id_, obj)
- del as_dict['_library']
- assert self.id_ is not None
- into_library(library, self.__class__.__name__, self.id_, as_dict)
- assert isinstance(as_dict['id'], (int, str))
- return as_dict['id']
+ rels_to_collect = [rel[2] for rel in self.to_save_relations]
+ rels_to_collect += self.add_to_dict
+ for attr_name in rels_to_collect:
+ rel_list = []
+ for item in getattr(self, attr_name):
+ rel_list += [item.id_]
+ if item not in refs:
+ refs += [item]
+ d[attr_name] = rel_list
+ return d, refs
@classmethod
def name_lowercase(cls) -> str:
@classmethod
def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title'
) -> str:
- """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed)."""
+ """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed).
+
+ Before cls.sorters[sort_key] is applied, seq is sorted by .id_, to
+ ensure predictability where parts of seq are of same sort value.
+ """
reverse = False
if len(sort_key) > 1 and '-' == sort_key[0]:
sort_key = sort_key[1:]
reverse = True
if sort_key not in cls.sorters:
sort_key = default
+ seq.sort(key=lambda x: x.id_, reverse=reverse)
sorter: Callable[..., Any] = cls.sorters[sort_key]
seq.sort(key=sorter, reverse=reverse)
if reverse:
"""Make from DB row (sans relations), update DB cache with it."""
obj = cls(*row)
assert obj.id_ is not None
- for attr_name in cls.to_save_versioned:
+ for attr_name in cls.to_save_versioned():
attr = getattr(obj, attr_name)
table_name = attr.table_name
for row_ in db_conn.row_where(table_name, 'parent', obj.id_):
date_col: str = 'day'
) -> tuple[list[BaseModelInstance], str,
str]:
- """Return list of items in database within (open) date_range interval.
+ """Return list of items in DB within (closed) date_range interval.
If no range values provided, defaults them to 'yesterday' and
'tomorrow'. Knows to properly interpret these and 'today' as value.
"""Write self to DB and cache and ensure .id_.
Write both to DB, and to cache. To DB, write .id_ and attributes
- listed in cls.to_save[_versioned|_relations].
+ listed in cls.to_save_[simples|versioned|_relations].
Ensure self.id_ by setting it to what the DB command returns as the
last saved row's ID (cursor.lastrowid), EXCEPT if self.id_ already
only the case with the Day class, where it's to be a date string.
"""
values = tuple([self.id_] + [getattr(self, key)
- for key in self.to_save])
+ for key in self.to_save_simples])
table_name = self.table_name
cursor = db_conn.exec_on_vals(f'REPLACE INTO {table_name} VALUES',
values)
if not isinstance(self.id_, str):
self.id_ = cursor.lastrowid # type: ignore[assignment]
self.cache()
- for attr_name in self.to_save_versioned:
+ for attr_name in self.to_save_versioned():
getattr(self, attr_name).save(db_conn)
for table, column, attr_name, key_index in self.to_save_relations:
assert isinstance(self.id_, (int, str))
"""Remove from DB and cache, including dependencies."""
if self.id_ is None or self._get_cached(self.id_) is None:
raise HandledException('cannot remove unsaved item')
- for attr_name in self.to_save_versioned:
+ for attr_name in self.to_save_versioned():
getattr(self, attr_name).remove(db_conn)
for table, column, attr_name, _ in self.to_save_relations:
db_conn.delete_where(table, column, self.id_)
"""Web server stuff."""
from __future__ import annotations
-from dataclasses import dataclass
from typing import Any, Callable
from base64 import b64encode, b64decode
from binascii import Error as binascii_Exception
from plomtask.days import Day
from plomtask.exceptions import (HandledException, BadFormatException,
NotFoundException)
-from plomtask.db import DatabaseConnection, DatabaseFile
+from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
from plomtask.processes import Process, ProcessStep, ProcessStepsNode
from plomtask.conditions import Condition
-from plomtask.todos import Todo
+from plomtask.todos import Todo, TodoOrProcStepNode, DictableNode
TEMPLATES_DIR = 'templates'
*args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.db = db_file
- self.headers: list[tuple[str, str]] = []
- self._render_mode = 'html'
- self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
-
- def set_json_mode(self) -> None:
- """Make server send JSON instead of HTML responses."""
- self._render_mode = 'json'
- self.headers += [('Content-Type', 'application/json')]
-
- @staticmethod
- def ctx_to_json(ctx: dict[str, object]) -> str:
- """Render ctx into JSON string."""
- def walk_ctx(node: object) -> Any:
- if hasattr(node, 'as_dict_into_reference'):
- if hasattr(node, 'id_') and node.id_ is not None:
- return node.as_dict_into_reference(library)
- if hasattr(node, 'as_dict'):
- return node.as_dict
- if isinstance(node, (list, tuple)):
- return [walk_ctx(x) for x in node]
- if isinstance(node, dict):
- d = {}
- for k, v in node.items():
- d[k] = walk_ctx(v)
- return d
- if isinstance(node, HandledException):
- return str(node)
- return node
- library: dict[str, dict[str | int, object]] = {}
- for k, v in ctx.items():
- ctx[k] = walk_ctx(v)
- ctx['_library'] = library
- return json_dumps(ctx)
-
- def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
- """Render ctx according to self._render_mode.."""
- tmpl_name = f'{tmpl_name}.{self._render_mode}'
- if 'html' == self._render_mode:
- template = self._jinja.get_template(tmpl_name)
- return template.render(ctx)
- return self.__class__.ctx_to_json(ctx)
+ self.render_mode = 'html'
+ self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
class InputsParser:
def __init__(self, dict_: dict[str, list[str]],
strictness: bool = True) -> None:
self.inputs = dict_
- self.strict = strictness
+ self.strict = strictness # return None on absence of key, or fail?
def get_str(self, key: str, default: str = '',
ignore_strict: bool = False) -> str:
"""Retrieve single/first string value of key, or default."""
if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
if self.strict and not ignore_strict:
- raise BadFormatException(f'no value found for key {key}')
+ raise NotFoundException(f'no value found for key {key}')
return default
return self.inputs[key][0]
msg = f'cannot float form field value for key {key}: {val}'
raise BadFormatException(msg) from e
+ def get_float_or_none(self, key: str) -> float | None:
+ """Retrieve float value of key from self.postvars, None if empty."""
+ val = self.get_str(key)
+ if '' == val:
+ return None
+ try:
+ return float(val)
+ except ValueError as e:
+ msg = f'cannot float form field value for key {key}: {val}'
+ raise BadFormatException(msg) from e
+
+ def get_bool(self, key: str) -> bool:
+ """Return if key occurs and maps to a boolean Truth."""
+ return bool(self.get_all_str(key))
+
def get_all_str(self, key: str) -> list[str]:
"""Retrieve list of string values at key."""
if key not in self.inputs.keys():
_form_data: InputsParser
_params: InputsParser
- def _send_page(self,
- ctx: dict[str, Any],
- tmpl_name: str,
- code: int = 200
- ) -> None:
- """Send ctx as proper HTTP response."""
- body = self.server.render(ctx, tmpl_name)
+ def _send_page(
+ self, ctx: dict[str, Any], tmpl_name: str, code: int = 200
+ ) -> None:
+ """HTTP-send ctx as HTML or JSON, as defined by .server.render_mode.
+
+ The differentiation by .server.render_mode serves to allow easily
+ comparable JSON responses for automatic testing.
+ """
+ body: str
+ headers: list[tuple[str, str]] = []
+ if 'html' == self.server.render_mode:
+ tmpl = self.server.jinja.get_template(f'{tmpl_name}.html')
+ body = tmpl.render(ctx)
+ else:
+ body = self._ctx_to_json(ctx)
+ headers += [('Content-Type', 'application/json')]
self.send_response(code)
- for header_tuple in self.server.headers:
+ for header_tuple in headers:
self.send_header(*header_tuple)
self.end_headers()
self.wfile.write(bytes(body, 'utf-8'))
+ def _ctx_to_json(self, ctx: dict[str, object]) -> str:
+ """Render ctx into JSON string.
+
+ Flattens any objects that json.dumps might not want to serialize, and
+ turns occurrences of BaseModel objects into listings of their .id_, to
+ be resolved to a full dict inside a top-level '_library' dictionary,
+ to avoid endless and circular nesting.
+ """
+
+ def flatten(node: object) -> object:
+
+ def update_library_with(
+ item: BaseModel[int] | BaseModel[str]) -> None:
+ cls_name = item.__class__.__name__
+ if cls_name not in library:
+ library[cls_name] = {}
+ if item.id_ not in library[cls_name]:
+ d, refs = item.as_dict_and_refs
+ id_key = '?' if item.id_ is None else item.id_
+ library[cls_name][id_key] = d
+ for ref in refs:
+ update_library_with(ref)
+
+ if isinstance(node, BaseModel):
+ update_library_with(node)
+ return node.id_
+ if isinstance(node, DictableNode):
+ d, refs = node.as_dict_and_refs
+ for ref in refs:
+ update_library_with(ref)
+ return d
+ if isinstance(node, (list, tuple)):
+ return [flatten(item) for item in node]
+ if isinstance(node, dict):
+ d = {}
+ for k, v in node.items():
+ d[k] = flatten(v)
+ return d
+ if isinstance(node, HandledException):
+ return str(node)
+ return node
+
+ library: dict[str, dict[str | int, object]] = {}
+ for k, v in ctx.items():
+ ctx[k] = flatten(v)
+ ctx['_library'] = library
+ return json_dumps(ctx)
+
@staticmethod
def _request_wrapper(http_method: str, not_found_msg: str
) -> Callable[..., Callable[[TaskHandler], None]]:
same, the only difference being the HTML template they are rendered to,
which .do_GET selects from their method name.
"""
- start = self._params.get_str('start')
- end = self._params.get_str('end')
- if not end:
- end = date_in_n_days(366)
- ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
- days, start, end = ret
+ start, end = self._params.get_str('start'), self._params.get_str('end')
+ end = end if end else date_in_n_days(366)
+ days, start, end = Day.by_date_range_with_limits(self.conn,
+ (start, end), 'id')
days = Day.with_filled_gaps(days, start, end)
today = date_in_n_days(0)
return {'start': start, 'end': end, 'days': days, 'today': today}
def do_GET_todo(self, todo: Todo) -> dict[str, object]:
"""Show single Todo of ?id=."""
- @dataclass
- class TodoStepsNode:
- """Collect what's useful for Todo steps tree display."""
- id_: int
- todo: Todo | None
- process: Process | None
- children: list[TodoStepsNode] # pylint: disable=undefined-variable
- fillable: bool = False
-
- def walk_process_steps(id_: int,
+ def walk_process_steps(node_id: int,
process_step_nodes: list[ProcessStepsNode],
- steps_nodes: list[TodoStepsNode]) -> None:
+ steps_nodes: list[TodoOrProcStepNode]) -> int:
for process_step_node in process_step_nodes:
- id_ += 1
- node = TodoStepsNode(id_, None, process_step_node.process, [])
+ node_id += 1
+ node = TodoOrProcStepNode(node_id, None,
+ process_step_node.process, [])
steps_nodes += [node]
- walk_process_steps(id_, list(process_step_node.steps.values()),
- node.children)
+ node_id = walk_process_steps(
+ node_id, list(process_step_node.steps.values()),
+ node.children)
+ return node_id
- def walk_todo_steps(id_: int, todos: list[Todo],
- steps_nodes: list[TodoStepsNode]) -> None:
+ def walk_todo_steps(node_id: int, todos: list[Todo],
+ steps_nodes: list[TodoOrProcStepNode]) -> int:
for todo in todos:
matched = False
for match in [item for item in steps_nodes
matched = True
for child in match.children:
child.fillable = True
- walk_todo_steps(id_, todo.children, match.children)
+ node_id = walk_todo_steps(
+ node_id, todo.children, match.children)
if not matched:
- id_ += 1
- node = TodoStepsNode(id_, todo, None, [])
+ node_id += 1
+ node = TodoOrProcStepNode(node_id, todo, None, [])
steps_nodes += [node]
- walk_todo_steps(id_, todo.children, node.children)
+ node_id = walk_todo_steps(
+ node_id, todo.children, node.children)
+ return node_id
- def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
- ) -> set[int]:
+ def collect_adoptables_keys(
+ steps_nodes: list[TodoOrProcStepNode]) -> set[int]:
ids = set()
for node in steps_nodes:
if not node.todo:
todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
process_tree = todo.process.get_steps(self.conn, None)
- steps_todo_to_process: list[TodoStepsNode] = []
- walk_process_steps(0, list(process_tree.values()),
- steps_todo_to_process)
+ steps_todo_to_process: list[TodoOrProcStepNode] = []
+ last_node_id = walk_process_steps(
+ 0, list(process_tree.values()), steps_todo_to_process)
for steps_node in steps_todo_to_process:
steps_node.fillable = True
- walk_todo_steps(len(steps_todo_to_process), todo_steps,
- steps_todo_to_process)
+ walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
adoptables: dict[int, list[Todo]] = {}
any_adoptables = [Todo.by_id(self.conn, t.id_)
for t in Todo.by_date(self.conn, todo.date)
for id_ in collect_adoptables_keys(steps_todo_to_process):
adoptables[id_] = [t for t in any_adoptables
if t.process.id_ == id_]
- return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
+ return {'todo': todo,
+ 'steps_todo_to_process': steps_todo_to_process,
'adoption_candidates_for': adoptables,
- 'process_candidates': Process.all(self.conn),
+ 'process_candidates': sorted(Process.all(self.conn)),
'todo_candidates': any_adoptables,
'condition_candidates': Condition.all(self.conn)}
def do_POST_day(self) -> str:
"""Update or insert Day of date and Todos mapped to it."""
# pylint: disable=too-many-locals
- date = self._params.get_str('date')
- day_comment = self._form_data.get_str('day_comment')
- make_type = self._form_data.get_str('make_type')
+ try:
+ date = self._params.get_str('date')
+ day_comment = self._form_data.get_str('day_comment')
+ make_type = self._form_data.get_str('make_type')
+ except NotFoundException as e:
+ raise BadFormatException from e
old_todos = self._form_data.get_all_int('todo_id')
new_todos = self._form_data.get_all_int('new_todo')
comments = self._form_data.get_all_str('comment')
def do_POST_todo(self, todo: Todo) -> str:
"""Update Todo and its children."""
# pylint: disable=too-many-locals
+ # pylint: disable=too-many-branches
adopted_child_ids = self._form_data.get_all_int('adopt')
processes_to_make_full = self._form_data.get_all_int('make_full')
processes_to_make_empty = self._form_data.get_all_int('make_empty')
- fill_fors = self._form_data.get_first_strings_starting('fill_for_')
- effort = self._form_data.get_str('effort', ignore_strict=True)
- conditions = self._form_data.get_all_int('conditions')
- disables = self._form_data.get_all_int('disables')
- blockers = self._form_data.get_all_int('blockers')
- enables = self._form_data.get_all_int('enables')
- is_done = len(self._form_data.get_all_str('done')) > 0
- calendarize = len(self._form_data.get_all_str('calendarize')) > 0
- comment = self._form_data.get_str('comment', ignore_strict=True)
- for v in fill_fors.values():
- if v.startswith('make_empty_'):
- processes_to_make_empty += [int(v[11:])]
- elif v.startswith('make_full_'):
- processes_to_make_full += [int(v[10:])]
- elif v != 'ignore':
- adopted_child_ids += [int(v)]
+ step_fillers = self._form_data.get_all_str('step_filler')
+ to_update = {
+ 'is_done': self._form_data.get_bool('done'),
+ 'calendarize': self._form_data.get_bool('calendarize'),
+ 'comment': self._form_data.get_str('comment', ignore_strict=True)}
+ try:
+ to_update['effort'] = self._form_data.get_float_or_none('effort')
+ except NotFoundException:
+ pass
+ cond_rel_id_lists = [self._form_data.get_all_int(name) for name in
+ ['conditions', 'blockers', 'enables', 'disables']]
+ for filler in [f for f in step_fillers if f != 'ignore']:
+ target_id: int
+ to_int = filler
+ for prefix in [p for p in ['make_empty_', 'make_full_']
+ if filler.startswith(p)]:
+ to_int = filler[len(prefix):]
+ try:
+ target_id = int(to_int)
+ except ValueError as e:
+ msg = 'bad fill_for target: {filler}'
+ raise BadFormatException(msg) from e
+ if filler.startswith('make_empty_'):
+ processes_to_make_empty += [target_id]
+ elif filler.startswith('make_full_'):
+ processes_to_make_full += [target_id]
+ else:
+ adopted_child_ids += [target_id]
to_remove = []
for child in todo.children:
assert isinstance(child.id_, int)
child = Todo.by_id(self.conn, id_)
todo.remove_child(child)
for child_id in adopted_child_ids:
- if child_id in [c.id_ for c in todo.children]:
- continue
- child = Todo.by_id(self.conn, child_id)
- todo.add_child(child)
+ if child_id not in [c.id_ for c in todo.children]:
+ todo.add_child(Todo.by_id(self.conn, child_id))
for process_id in processes_to_make_empty:
process = Process.by_id(self.conn, process_id)
made = Todo(None, process, False, todo.date)
for process_id in processes_to_make_full:
made = Todo.create_with_children(self.conn, process_id, todo.date)
todo.add_child(made)
- todo.effort = float(effort) if effort else None
- todo.set_conditions(self.conn, conditions)
- todo.set_blockers(self.conn, blockers)
- todo.set_enables(self.conn, enables)
- todo.set_disables(self.conn, disables)
- todo.is_done = is_done
- todo.calendarize = calendarize
- todo.comment = comment
+ todo.set_condition_relations(self.conn, *cond_rel_id_lists)
+ todo.update_attrs(**to_update)
+ # todo.save() may destroy Todo if .effort < 0, so retrieve .id_ early
+ url = f'/todo?id={todo.id_}'
todo.save(self.conn)
- return f'/todo?id={todo.id_}'
+ return url
def do_POST_process_descriptions(self) -> str:
"""Update history timestamps for Process.description."""
"""Update or insert Process of ?id= and fields defined in postvars."""
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
- title = self._form_data.get_str('title')
- description = self._form_data.get_str('description')
- effort = self._form_data.get_float('effort')
+ try:
+ title = self._form_data.get_str('title')
+ description = self._form_data.get_str('description')
+ effort = self._form_data.get_float('effort')
+ except NotFoundException as e:
+ raise BadFormatException from e
conditions = self._form_data.get_all_int('conditions')
blockers = self._form_data.get_all_int('blockers')
enables = self._form_data.get_all_int('enables')
disables = self._form_data.get_all_int('disables')
- calendarize = self._form_data.get_all_str('calendarize') != []
+ calendarize = self._form_data.get_bool('calendarize')
suppresses = self._form_data.get_all_int('suppresses')
step_of = self._form_data.get_all_str('step_of')
keep_steps = self._form_data.get_all_int('keep_step')
process.title.set(title)
process.description.set(description)
process.effort.set(effort)
- process.set_conditions(self.conn, conditions)
- process.set_blockers(self.conn, blockers)
- process.set_enables(self.conn, enables)
- process.set_disables(self.conn, disables)
+ process.set_condition_relations(
+ self.conn, conditions, blockers, enables, disables)
process.calendarize = calendarize
process.save(self.conn)
assert isinstance(process.id_, int)
@_delete_or_post(Condition, '/conditions')
def do_POST_condition(self, condition: Condition) -> str:
"""Update/insert Condition of ?id= and fields defined in postvars."""
- is_active = self._form_data.get_str('is_active') == 'True'
- title = self._form_data.get_str('title')
- description = self._form_data.get_str('description')
+ try:
+ is_active = self._form_data.get_str('is_active') == 'True'
+ title = self._form_data.get_str('title')
+ description = self._form_data.get_str('description')
+ except NotFoundException as e:
+ raise BadFormatException(e) from e
condition.is_active = is_active
condition.title.set(title)
condition.description.set(description)
"""Template for, and metadata for, Todos, and their arrangements."""
# pylint: disable=too-many-instance-attributes
table_name = 'processes'
- to_save = ['calendarize']
- to_save_versioned = ['title', 'description', 'effort']
+ to_save_simples = ['calendarize']
to_save_relations = [('process_conditions', 'process', 'conditions', 0),
('process_blockers', 'process', 'blockers', 0),
('process_enables', 'process', 'enables', 0),
('process_step_suppressions', 'process',
'suppressed_steps', 0)]
add_to_dict = ['explicit_steps']
+ versioned_defaults = {'title': 'UNNAMED', 'description': '', 'effort': 1.0}
to_search = ['title.newest', 'description.newest']
can_create_by_id = True
sorters = {'steps': lambda p: len(p.explicit_steps),
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
BaseModel.__init__(self, id_)
ConditionsRelations.__init__(self)
- self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
- self.description = VersionedAttribute(self, 'process_descriptions', '')
- self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
+ for name in ['title', 'description', 'effort']:
+ attr = VersionedAttribute(self, f'process_{name}s',
+ self.versioned_defaults[name])
+ setattr(self, name, attr)
self.explicit_steps: list[ProcessStep] = []
self.suppressed_steps: list[ProcessStep] = []
self.calendarize = calendarize
class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
- to_save = ['owner_id', 'step_process_id', 'parent_step_id']
+ to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
parent_step_id: int | None) -> None:
from plomtask.dating import valid_date
-class TodoNode:
+class DictableNode:
+ """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
+ # pylint: disable=too-few-public-methods
+ _to_dict: list[str] = []
+
+ def __init__(self, *args: Any) -> None:
+ for i, arg in enumerate(args):
+ setattr(self, self._to_dict[i], arg)
+
+ @property
+ def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
+ """Return self as json.dumps-ready dict, list of referenced objects."""
+ d = {}
+ refs = []
+ for name in self._to_dict:
+ attr = getattr(self, name)
+ if hasattr(attr, 'id_'):
+ d[name] = attr.id_
+ continue
+ if isinstance(attr, list):
+ d[name] = []
+ for item in attr:
+ item_d, item_refs = item.as_dict_and_refs
+ d[name] += [item_d]
+ for item_ref in [r for r in item_refs if r not in refs]:
+ refs += [item_ref]
+ continue
+ d[name] = attr
+ return d, refs
+
+
+class TodoNode(DictableNode):
"""Collects what's useful to know for Todo/Condition tree display."""
# pylint: disable=too-few-public-methods
todo: Todo
seen: bool
children: list[TodoNode]
+ _to_dict = ['todo', 'seen', 'children']
- def __init__(self,
- todo: Todo,
- seen: bool,
- children: list[TodoNode]) -> None:
- self.todo = todo
- self.seen = seen
- self.children = children
- @property
- def as_dict(self) -> dict[str, object]:
- """Return self as (json.dumps-coompatible) dict."""
- return {'todo': self.todo.id_,
- 'seen': self.seen,
- 'children': [c.as_dict for c in self.children]}
+class TodoOrProcStepNode(DictableNode):
+ """Collect what's useful for Todo-or-ProcessStep tree display."""
+ # pylint: disable=too-few-public-methods
+ node_id: int
+ todo: Todo | None
+ process: Process | None
+ children: list[TodoOrProcStepNode] # pylint: disable=undefined-variable
+ fillable: bool = False
+ _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
class Todo(BaseModel[int], ConditionsRelations):
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
table_name = 'todos'
- to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
- 'calendarize']
+ to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
+ 'calendarize']
to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
('todo_blockers', 'todo', 'blockers', 0),
('todo_enables', 'todo', 'enables', 0),
@classmethod
def create_with_children(cls, db_conn: DatabaseConnection,
process_id: int, date: str) -> Todo:
- """Create Todo of process for date, ensure children."""
+ """Create Todo of process for date, ensure children demanded by chain.
+
+ At minimum creates Todo of process_id, but checks the respective
+ Process for its step tree, and walks down that to provide the initial
+ Todo with all descendants defined there, either adopting existing
+ Todos, or creating them where necessary.
+ """
def key_order_func(n: ProcessStepsNode) -> int:
assert isinstance(n.process.id_, int)
@property
def title(self) -> VersionedAttribute:
"""Shortcut to .process.title."""
+ assert isinstance(self.process.title, VersionedAttribute)
return self.process.title
@property
self.children.remove(child)
child.parents.remove(self)
+ def update_attrs(self, **kwargs: Any) -> None:
+ """Update self's attributes listed in kwargs."""
+ for k, v in kwargs.items():
+ setattr(self, k, v)
+
def save(self, db_conn: DatabaseConnection) -> None:
"""On save calls, also check if auto-deletion by effort < 0."""
if self.effort and self.effort < 0 and self.is_deletable:
parent: Any, table_name: str, default: str | float) -> None:
self.parent = parent
self.table_name = table_name
- self.default = default
+ self._default = default
self.history: dict[str, str | float] = {}
+ # NB: For tighter mypy testing, we might prefer self.history to be
+ # dict[str, float] | dict[str, str] instead, but my current coding
+ # knowledge only manages to make that work by adding much further
+ # complexity, so let's leave it at that for now …
def __hash__(self) -> int:
history_tuples = tuple((k, v) for k, v in self.history.items())
- hashable = (self.parent.id_, self.table_name, self.default,
+ hashable = (self.parent.id_, self.table_name, self._default,
history_tuples)
return hash(hashable)
"""Return most recent timestamp."""
return sorted(self.history.keys())[-1]
+ @property
+ def value_type_name(self) -> str:
+ """Return string of name of attribute value type."""
+ return type(self._default).__name__
+
@property
def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
+ """Return most recent value, or self._default if self.history empty."""
if 0 == len(self.history):
- return self.default
+ return self._default
return self.history[self._newest_timestamp]
def reset_timestamp(self, old_str: str, new_str: str) -> None:
queried_time += ' 23:59:59.999'
sorted_timestamps = sorted(self.history.keys())
if 0 == len(sorted_timestamps):
- return self.default
+ return self._default
selected_timestamp = sorted_timestamps[0]
for timestamp in sorted_timestamps[1:]:
if timestamp > queried_time:
--- /dev/null
+{% extends '_base.html' %}
+
+{% block content %}
+<h3>calendar</h3>
+
+<p><a href="/calendar">normal view</a></p>
+
+<form action="calendar_txt" method="GET">
+from <input name="start" class="date" value="{{start}}" />
+to <input name="end" class="date" value="{{end}}" />
+<input type="submit" value="OK" />
+</form>
+<table>
+
+<pre>{% for day in days %}{% if day.weekday == "Monday" %}
+---{% endif %}{% if day.comment or day.calendarized_todos %}
+{{day.weekday|truncate(2,True,'',0)}} {{day.date}} {{day.comment|e}}{% endif %}{% if day.calendarized_todos%}{% for todo in day.calendarized_todos %}
+* {{todo.title_then|e}}{% if todo.comment %} / {{todo.comment|e}}{% endif %}{% endfor %}{% endif %}{% endfor %}
+</pre>
+{% endblock %}
{% else %}
{{item.process.title.newest|e}}
{% if indent == 0 %}
-· fill: <select name="fill_for_{{item.id_}}">
+· fill: <select name="step_filler">
<option value="ignore">--</option>
<option value="make_empty_{{item.process.id_}}">make empty</option>
<option value="make_full_{{item.process.id_}}">make full</option>
class TestsSansDB(TestCaseSansDB):
"""Tests requiring no DB setup."""
checked_class = Condition
- versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''}
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Condition
default_init_kwargs = {'is_active': False}
- test_versioneds = {'title': str, 'description': str}
def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
proc = Process(None)
proc.save(self.db_conn)
todo = Todo(None, proc, False, '2024-01-01')
+ todo.save(self.db_conn)
+ # check condition can only be deleted if not depended upon
for depender in (proc, todo):
- assert hasattr(depender, 'save')
- assert hasattr(depender, 'set_conditions')
c = Condition(None)
c.save(self.db_conn)
- depender.save(self.db_conn)
- depender.set_conditions(self.db_conn, [c.id_], 'conditions')
+ depender.set_condition_relations(self.db_conn, [c.id_], [], [], [])
depender.save(self.db_conn)
with self.assertRaises(HandledException):
c.remove(self.db_conn)
- depender.set_conditions(self.db_conn, [], 'conditions')
+ depender.set_condition_relations(self.db_conn, [], [], [], [])
depender.save(self.db_conn)
c.remove(self.db_conn)
def test_fail_POST_condition(self) -> None:
"""Test malformed/illegal POST /condition requests."""
- # check invalid POST payloads
+ # check incomplete POST payloads
url = '/condition'
self.check_post({}, url, 400)
self.check_post({'title': ''}, url, 400)
valid_payload = {'title': '', 'description': '', 'is_active': False}
self.check_post(valid_payload, '/condition?id=foo', 400)
- def test_do_POST_condition(self) -> None:
+ def test_POST_condition(self) -> None:
"""Test (valid) POST /condition and its effect on GET /condition[s]."""
# test valid POST's effect on …
post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(post, '/condition', 302, '/condition?id=1')
+ self.check_post(post, '/condition', redir='/condition?id=1')
# … single /condition
- cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(cond['_versioned'], dict)
- expected_single = self.GET_condition_dict(cond)
+ expected_cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(expected_cond['_versioned'], dict)
+ expected_single = self.GET_condition_dict(expected_cond)
self.check_json_get('/condition?id=1', expected_single)
# … full /conditions
- expected_all = self.GET_conditions_dict([cond])
+ expected_all = self.GET_conditions_dict([expected_cond])
self.check_json_get('/conditions', expected_all)
# test (no) effect of invalid POST to existing Condition on /condition
self.check_post({}, '/condition?id=1', 400)
self.check_json_get('/condition?id=1', expected_single)
# test effect of POST changing title and activeness
post = {'title': 'bar', 'description': 'oof', 'is_active': True}
- self.check_post(post, '/condition?id=1', 302)
- cond['_versioned']['title'][1] = 'bar'
- cond['is_active'] = True
+ self.check_post(post, '/condition?id=1')
+ expected_cond['_versioned']['title'][1] = 'bar'
+ expected_cond['is_active'] = True
self.check_json_get('/condition?id=1', expected_single)
- # test deletion POST's effect on …
- self.check_post({'delete': ''}, '/condition?id=1', 302, '/conditions')
- cond = self.cond_as_dict()
+ # test deletion POST's effect, both to return id=1 into empty single, …
+ self.check_post({'delete': ''}, '/condition?id=1', redir='/conditions')
+ expected_cond = self.cond_as_dict()
assert isinstance(expected_single['_library'], dict)
- expected_single['_library']['Condition'] = self.as_refs([cond])
+ expected_single['_library']['Condition'] = self.as_refs(
+ [expected_cond])
self.check_json_get('/condition?id=1', expected_single)
- # … full /conditions
+ # … and full /conditions into empty list
expected_all['conditions'] = []
expected_all['_library'] = {}
self.check_json_get('/conditions', expected_all)
- def test_do_GET_condition(self) -> None:
+ def test_GET_condition(self) -> None:
"""More GET /condition testing, especially for Process relations."""
# check expected default status codes
self.check_get_defaults('/condition')
# make Condition and two Processes that among them establish all
# possible ConditionsRelations to it, …
cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(cond_post, '/condition', 302, '/condition?id=1')
+ self.check_post(cond_post, '/condition', redir='/condition?id=1')
proc1_post = {'title': 'A', 'description': '', 'effort': 1.0,
'conditions': [1], 'disables': [1]}
proc2_post = {'title': 'B', 'description': '', 'effort': 1.0,
self.post_process(1, proc1_post)
self.post_process(2, proc2_post)
# … then check /condition displays all these properly.
- cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(cond['id'], int)
- proc1 = self.proc_as_dict(conditions=[cond['id']],
- disables=[cond['id']])
+ cond_expected = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(cond_expected['id'], int)
+ proc1 = self.proc_as_dict(conditions=[cond_expected['id']],
+ disables=[cond_expected['id']])
proc2 = self.proc_as_dict(2, 'B',
- blockers=[cond['id']],
- enables=[cond['id']])
- expected = self.GET_condition_dict(cond)
- assert isinstance(expected['_library'], dict)
- expected['enabled_processes'] = self.as_id_list([proc1])
- expected['disabled_processes'] = self.as_id_list([proc2])
- expected['enabling_processes'] = self.as_id_list([proc2])
- expected['disabling_processes'] = self.as_id_list([proc1])
- expected['_library']['Process'] = self.as_refs([proc1, proc2])
- self.check_json_get('/condition?id=1', expected)
+ blockers=[cond_expected['id']],
+ enables=[cond_expected['id']])
+ display_expected = self.GET_condition_dict(cond_expected)
+ assert isinstance(display_expected['_library'], dict)
+ display_expected['enabled_processes'] = self.as_id_list([proc1])
+ display_expected['disabled_processes'] = self.as_id_list([proc2])
+ display_expected['enabling_processes'] = self.as_id_list([proc2])
+ display_expected['disabling_processes'] = self.as_id_list([proc1])
+ display_expected['_library']['Process'] = self.as_refs([proc1, proc2])
+ self.check_json_get('/condition?id=1', display_expected)
- def test_do_GET_conditions(self) -> None:
+ def test_GET_conditions(self) -> None:
"""Test GET /conditions."""
# test empty result on empty DB, default-settings on empty params
expected = self.GET_conditions_dict([])
self.check_json_get('/conditions', expected)
- # test on meaningless non-empty params (incl. entirely un-used key),
+ # test ignorance of meaningless non-empty params (incl. unknown key),
# that 'sort_by' default to 'title' (even if set to something else, as
# long as without handler) and 'pattern' get preserved
expected['pattern'] = 'bar' # preserved despite zero effect!
+ expected['sort_by'] = 'title' # for clarity (actually already set)
url = '/conditions?sort_by=foo&pattern=bar&foo=x'
self.check_json_get(url, expected)
# test non-empty result, automatic (positive) sorting by title
- post1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
- post2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
- post3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
- self.check_post(post1, '/condition', 302, '/condition?id=1')
- self.check_post(post2, '/condition', 302, '/condition?id=2')
- self.check_post(post3, '/condition', 302, '/condition?id=3')
+ post_cond1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
+ post_cond2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
+ post_cond3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
+ self.check_post(post_cond1, '/condition', redir='/condition?id=1')
+ self.check_post(post_cond2, '/condition', redir='/condition?id=2')
+ self.check_post(post_cond3, '/condition', redir='/condition?id=3')
cond1 = self.cond_as_dict(1, False, ['foo'], ['oof'])
cond2 = self.cond_as_dict(2, False, ['bar'], ['rab'])
cond3 = self.cond_as_dict(3, True, ['baz'], ['zab'])
expected = self.GET_conditions_dict([cond2, cond3, cond1])
self.check_json_get('/conditions', expected)
# test other sortings
- # (NB: by .is_active has two items of =False, their order currently
- # is not explicitly made predictable, so mail fail until we do)
- expected['conditions'] = self.as_id_list([cond1, cond3, cond2])
expected['sort_by'] = '-title'
+ assert isinstance(expected['conditions'], list)
+ expected['conditions'].reverse()
self.check_json_get('/conditions?sort_by=-title', expected)
- expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
expected['sort_by'] = 'is_active'
+ expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
self.check_json_get('/conditions?sort_by=is_active', expected)
- expected['conditions'] = self.as_id_list([cond3, cond1, cond2])
expected['sort_by'] = '-is_active'
+ expected['conditions'].reverse()
self.check_json_get('/conditions?sort_by=-is_active', expected)
# test pattern matching on title
expected = self.GET_conditions_dict([cond2, cond3])
self.check_json_get('/conditions?pattern=ba', expected)
# test pattern matching on description
assert isinstance(expected['_library'], dict)
+ expected['pattern'] = 'of'
expected['conditions'] = self.as_id_list([cond1])
expected['_library']['Condition'] = self.as_refs([cond1])
- expected['pattern'] = 'oo'
- self.check_json_get('/conditions?pattern=oo', expected)
+ self.check_json_get('/conditions?pattern=of', expected)
"""Test Days module."""
-from unittest import TestCase
-from datetime import datetime
+from datetime import datetime, timedelta
from typing import Callable
-from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.dating import date_in_n_days
+from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
+from plomtask.dating import date_in_n_days as tested_date_in_n_days
from plomtask.days import Day
+# so far the same as plomtask.dating.DATE_FORMAT, but for testing purposes we
+# want to explicitly state our expectations here indepedently from that
+TESTING_DATE_FORMAT = '%Y-%m-%d'
-class TestsSansDB(TestCase):
+
+class TestsSansDB(TestCaseSansDB):
"""Days module tests not requiring DB setup."""
- legal_ids = ['2024-01-01']
- illegal_ids = ['foo', '2024-02-30', '2024-02-01 23:00:00']
+ checked_class = Day
+ legal_ids = ['2024-01-01', '2024-02-29']
+ illegal_ids = ['foo', '2023-02-29', '2024-02-30', '2024-02-01 23:00:00']
+
+ def test_date_in_n_days(self) -> None:
+ """Test dating.date_in_n_days"""
+ for n in [-100, -2, -1, 0, 1, 2, 1000]:
+ date = datetime.now() + timedelta(days=n)
+ self.assertEqual(tested_date_in_n_days(n),
+ date.strftime(TESTING_DATE_FORMAT))
def test_Day_datetime_weekday_neighbor_dates(self) -> None:
- """Test Day's date parsing."""
+ """Test Day's date parsing and neighbourhood resolution."""
self.assertEqual(datetime(2024, 5, 1), Day('2024-05-01').datetime)
self.assertEqual('Sunday', Day('2024-03-17').weekday)
self.assertEqual('March', Day('2024-03-17').month_name)
self.assertEqual('2023-12-31', Day('2024-01-01').prev_date)
self.assertEqual('2023-03-01', Day('2023-02-28').next_date)
- def test_Day_sorting(self) -> None:
- """Test sorting by .__lt__ and Day.__eq__."""
- day1 = Day('2024-01-01')
- day2 = Day('2024-01-02')
- day3 = Day('2024-01-03')
- days = [day3, day1, day2]
- self.assertEqual(sorted(days), [day1, day2, day3])
-
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Day
default_ids = ('2024-01-01', '2024-01-02', '2024-01-03')
- def test_Day_by_date_range_filled(self) -> None:
- """Test Day.by_date_range_filled."""
- date1, date2, date3 = self.default_ids
- day1 = Day(date1)
- day2 = Day(date2)
- day3 = Day(date3)
- for day in [day1, day2, day3]:
- day.save(self.db_conn)
- # check date range includes limiter days
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
- [day1, day2, day3])
- # check first date range value excludes what's earlier
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date2, date3),
- [day2, day3])
- # check second date range value excludes what's later
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date2),
- [day1, day2])
- # check swapped (impossible) date range returns emptiness
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date3, date1),
- [])
- # check fill_gaps= instantiates unsaved dates within date range
- # (but does not store them)
- day5 = Day('2024-01-05')
- day6 = Day('2024-01-06')
- day6.save(self.db_conn)
- day7 = Day('2024-01-07')
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- day5.date, day7.date),
- [day5, day6, day7])
- self.check_identity_with_cache_and_db([day1, day2, day3, day6])
- # check 'today' is interpreted as today's date
- today = Day(date_in_n_days(0))
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- 'today', 'today'),
- [today])
- prev_day = Day(date_in_n_days(-1))
- next_day = Day(date_in_n_days(1))
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- 'yesterday', 'tomorrow'),
- [prev_day, today, next_day])
+ def test_Day_by_date_range_with_limits(self) -> None:
+ """Test .by_date_range_with_limits."""
+ self.check_by_date_range_with_limits('id', set_id_field=False)
+
+ def test_Day_with_filled_gaps(self) -> None:
+ """Test .with_filled_gaps."""
+
+ def test(range_indexes: tuple[int, int], indexes_to_provide: list[int]
+ ) -> None:
+ start_i, end_i = range_indexes
+ days_provided = []
+ days_expected = days_sans_comment[:]
+ for i in indexes_to_provide:
+ day_with_comment = days_with_comment[i]
+ days_provided += [day_with_comment]
+ days_expected[i] = day_with_comment
+ days_expected = days_expected[start_i:end_i+1]
+ start, end = dates[start_i], dates[end_i]
+ days_result = self.checked_class.with_filled_gaps(days_provided,
+ start, end)
+ self.assertEqual(days_result, days_expected)
+
+ # for provided Days we use those from days_with_comment, to identify
+ # them against same-dated mere filler Days by their lack of comment
+ # (identity with Day at the respective position in days_sans_comment)
+ dates = [f'2024-02-0{n+1}' for n in range(9)]
+ days_with_comment = [Day(date, comment=date[-1:]) for date in dates]
+ days_sans_comment = [Day(date, comment='') for date in dates]
+ # check provided Days recognizable in (full-range) interval
+ test((0, 8), [0, 4, 8])
+ # check limited range, but limiting Days provided
+ test((2, 6), [2, 5, 6])
+ # check Days within range but beyond provided Days also filled in
+ test((1, 7), [2, 5])
+ # check provided Days beyond range ignored
+ test((3, 5), [1, 2, 4, 6, 7])
+ # check inversion of start_date and end_date returns empty list
+ test((5, 3), [2, 4, 6])
+ # check empty provision still creates filler elements in interval
+ test((3, 5), [])
+ # check single-element selection creating only filler beyond provided
+ test((1, 1), [2, 4, 6])
+ # check (un-saved) filler Days don't show up in cache or DB
+ # dates = [f'2024-02-0{n}' for n in range(1, 6)]
+ day = Day(dates[3])
+ day.save(self.db_conn)
+ self.checked_class.with_filled_gaps([day], dates[0], dates[-1])
+ self.check_identity_with_cache_and_db([day])
+ # check 'today', 'yesterday', 'tomorrow' are interpreted
+ yesterday = Day('yesterday')
+ tomorrow = Day('tomorrow')
+ today = Day('today')
+ result = self.checked_class.with_filled_gaps([today], 'yesterday',
+ 'tomorrow')
+ self.assertEqual(result, [yesterday, today, tomorrow])
class TestsWithServer(TestCaseWithServer):
"""Tests against our HTTP server/handler (and database)."""
- @classmethod
- def GET_day_dict(cls, date: str) -> dict[str, object]:
- """Return JSON of GET /day to expect."""
- # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []}
- day = cls._day_as_dict(date)
- d: dict[str, object] = {'day': date,
- 'top_nodes': [],
- 'make_type': '',
- 'enablers_for': {},
- 'disablers_for': {},
- 'conditions_present': [],
- 'processes': [],
- '_library': {'Day': cls.as_refs([day])}}
- return d
+ @staticmethod
+ def _testing_date_in_n_days(n: int) -> str:
+ """Return in TEST_DATE_FORMAT date from today + n days.
- @classmethod
- def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
- """Return JSON of GET /calendar to expect."""
- today_date = date_in_n_days(0)
- start_date = date_in_n_days(start)
- end_date = date_in_n_days(end)
- dates = [date_in_n_days(i) for i in range(start, end+1)]
- days = [cls._day_as_dict(d) for d in dates]
- library = {'Day': cls.as_refs(days)} if len(days) > 0 else {}
- return {'today': today_date, 'start': start_date, 'end': end_date,
- 'days': dates, '_library': library}
+ As with TESTING_DATE_FORMAT, we assume this equal the original's code
+ at plomtask.dating.date_in_n_days, but want to state our expectations
+ explicitly to rule out importing issues from the original.
+ """
+ date = datetime.now() + timedelta(days=n)
+ return date.strftime(TESTING_DATE_FORMAT)
@staticmethod
- def _todo_as_dict(id_: int = 1,
- process_id: int = 1,
- date: str = '2024-01-01',
- conditions: None | list[int] = None,
- disables: None | list[int] = None,
- blockers: None | list[int] = None,
- enables: None | list[int] = None
- ) -> dict[str, object]:
- """Return JSON of Todo to expect."""
- # pylint: disable=too-many-arguments
- d = {'id': id_,
- 'date': date,
- 'process_id': process_id,
- 'is_done': False,
- 'calendarize': False,
- 'comment': '',
- 'children': [],
- 'parents': [],
- 'effort': None,
- 'conditions': conditions if conditions else [],
- 'disables': disables if disables else [],
- 'blockers': blockers if blockers else [],
- 'enables': enables if enables else []}
- return d
+ def _day_as_dict(date: str) -> dict[str, object]:
+ return {'id': date, 'comment': '', 'todos': []}
@staticmethod
def _todo_node_as_dict(todo_id: int) -> dict[str, object]:
return {'children': [], 'seen': False, 'todo': todo_id}
@staticmethod
- def _day_as_dict(date: str) -> dict[str, object]:
- return {'id': date, 'comment': '', 'todos': []}
-
- @staticmethod
- def _post_batch(list_of_args: list[list[object]],
- names_of_simples: list[str],
- names_of_versioneds: list[str],
- f_as_dict: Callable[..., dict[str, object]],
- f_to_post: Callable[..., None | dict[str, object]]
- ) -> list[dict[str, object]]:
- """Post expected=f_as_dict(*args) as input to f_to_post, for many."""
- expecteds = []
- for args in list_of_args:
- expecteds += [f_as_dict(*args)]
- for expected in expecteds:
- assert isinstance(expected['_versioned'], dict)
- post = {}
- for name in names_of_simples:
- post[name] = expected[name]
- for name in names_of_versioneds:
- post[name] = expected['_versioned'][name][0]
- f_to_post(expected['id'], post)
- return expecteds
+ def _post_args_return_expectation(
+ args: list[object],
+ names_of_simples: list[str],
+ names_of_versioneds: list[str],
+ f_as_dict: Callable[..., dict[str, object]],
+ f_to_post: Callable[..., None | dict[str, object]]
+ ) -> dict[str, object]:
+ """Create expected=f_as_dict(*args), post as names_* with f_to_post."""
+ expected = f_as_dict(*args)
+ assert isinstance(expected['_versioned'], dict)
+ to_post = {}
+ for name in names_of_simples:
+ to_post[name] = expected[name]
+ for name in names_of_versioneds:
+ to_post[name] = expected['_versioned'][name][0]
+ f_to_post(expected['id'], to_post)
+ return expected
def _post_day(self, params: str = '',
form_data: None | dict[str, object] = None,
redir_to = f'{target}&make_type={form_data["make_type"]}'
self.check_post(form_data, target, status, redir_to)
+ @classmethod
+ def GET_day_dict(cls, date: str) -> dict[str, object]:
+ """Return JSON of GET /day to expect."""
+ day = cls._day_as_dict(date)
+ d: dict[str, object] = {'day': date,
+ 'top_nodes': [],
+ 'make_type': '',
+ 'enablers_for': {},
+ 'disablers_for': {},
+ 'conditions_present': [],
+ 'processes': [],
+ '_library': {'Day': cls.as_refs([day])}}
+ return d
+
+ @classmethod
+ def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
+ """Return JSON of GET /calendar to expect.
+
+ NB: the date string list to key 'days' implies/expects a continuous (=
+ gaps filled) alphabetical order of dates by virtue of range(start,
+ end+1) and date_in_n_days.
+ """
+ today_date = cls._testing_date_in_n_days(0)
+ start_date = cls._testing_date_in_n_days(start)
+ end_date = cls._testing_date_in_n_days(end)
+ dates = [cls._testing_date_in_n_days(i) for i in range(start, end+1)]
+ days = [cls._day_as_dict(d) for d in dates]
+ library = {'Day': cls.as_refs(days)} if len(days) > 0 else {}
+ return {'today': today_date, 'start': start_date, 'end': end_date,
+ 'days': dates, '_library': library}
+
def test_basic_GET_day(self) -> None:
"""Test basic (no Processes/Conditions/Todos) GET /day basics."""
# check illegal date parameters
self.check_get('/day?date=foo', 400)
self.check_get('/day?date=2024-02-30', 400)
# check undefined day
- date = date_in_n_days(0)
+ date = self._testing_date_in_n_days(0)
expected = self.GET_day_dict(date)
self.check_json_get('/day', expected)
- # NB: GET ?date="today"/"yesterday"/"tomorrow" in test_basic_POST_day
- # check 'make_type' GET parameter affects immediate reply, but …
+ # check defined day, with and without make_type parameter
date = '2024-01-01'
expected = self.GET_day_dict(date)
expected['make_type'] = 'bar'
self.check_json_get(f'/day?date={date}&make_type=bar', expected)
- # … not any following, …
- expected['make_type'] = ''
- self.check_json_get(f'/day?date={date}', expected)
- # … not even when part of a POST request
- post: dict[str, object] = {'day_comment': '', 'make_type': 'foo'}
- self._post_day(f'date={date}', post)
- self.check_json_get(f'/day?date={date}', expected)
+ # check parsing of 'yesterday', 'today', 'tomorrow'
+ for name, dist in [('yesterday', -1), ('today', 0), ('tomorrow', +1)]:
+ date = self._testing_date_in_n_days(dist)
+ expected = self.GET_day_dict(date)
+ self.check_json_get(f'/day?date={name}', expected)
def test_fail_POST_day(self) -> None:
"""Test malformed/illegal POST /day requests."""
self.check_post(post, '/day?date=foo', 400)
def test_basic_POST_day(self) -> None:
- """Test basic (no Todos) POST /day.
+ """Test basic (no Processes/Conditions/Todos) POST /day.
- Check POST (& GET!) requests properly parse 'today', 'tomorrow',
- 'yesterday', and actual date strings;
+ Check POST requests properly parse 'today', 'tomorrow', 'yesterday',
+ and actual date strings;
preserve 'make_type' setting in redirect even if nonsensical;
- and store 'day_comment'
+ and store 'day_comment'.
"""
for name, dist, test_str in [('2024-01-01', None, 'a'),
('today', 0, 'b'),
('yesterday', -1, 'c'),
('tomorrow', +1, 'd')]:
- date = name if dist is None else date_in_n_days(dist)
+ date = name if dist is None else self._testing_date_in_n_days(dist)
post = {'day_comment': test_str, 'make_type': f'x:{test_str}'}
post_url = f'/day?date={name}'
redir_url = f'{post_url}&make_type={post["make_type"]}'
def test_GET_day_with_processes_and_todos(self) -> None:
"""Test GET /day displaying Processes and Todos (no trees)."""
date = '2024-01-01'
- # check Processes get displayed in ['processes'] and ['_library']
- procs_data = [[1, 'foo', 'oof', 1.1], [2, 'bar', 'rab', 0.9]]
- procs_expected = self._post_batch(procs_data, [],
- ['title', 'description', 'effort'],
- self.proc_as_dict, self.post_process)
+ # check Processes get displayed in ['processes'] and ['_library'],
+ # even without any Todos referencing them
+ procs_data = [[1, 'foo', 'oof', 1.1], # id, title, desc, effort
+ [2, 'bar', 'rab', 0.9]]
+ procs_expected = []
+ for p_data in procs_data:
+ procs_expected += [self._post_args_return_expectation(
+ p_data, [], ['title', 'description', 'effort'],
+ self.proc_as_dict, self.post_process)]
expected = self.GET_day_dict(date)
assert isinstance(expected['_library'], dict)
expected['processes'] = self.as_id_list(procs_expected)
expected['_library']['Process'] = self.as_refs(procs_expected)
- self._post_day(f'date={date}')
self.check_json_get(f'/day?date={date}', expected)
# post Todos of either process and check their display
post_day: dict[str, object]
post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
- todos = [self._todo_as_dict(1, 1, date),
- self._todo_as_dict(2, 2, date)]
+ todos = [self.todo_as_dict(1, 1, date), self.todo_as_dict(2, 2, date)]
expected['_library']['Todo'] = self.as_refs(todos)
expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
"""Test GET /day displaying Conditions and their relations."""
date = '2024-01-01'
# add Process with Conditions and their Todos, check display
- conds_data = [[1, False, ['A'], ['a']], [2, True, ['B'], ['b']]]
- conds_expected = self._post_batch(
- conds_data, ['is_active'], ['title', 'description'],
+ conds_data = [[1, False, ['A'], ['a']], # id, is_active, title, desc
+ [2, True, ['B'], ['b']]]
+ conds_expected = []
+ for c_data in conds_data:
+ conds_expected += [self._post_args_return_expectation(
+ c_data, ['is_active'], ['title', 'description'],
self.cond_as_dict,
- lambda x, y: self.check_post(y, f'/condition?id={x}', 302))
- cond_names = ['conditions', 'disables', 'blockers', 'enables']
- procs_data = [[1, 'foo', 'oof', 1.1, [1], [1], [2], [2]],
+ lambda x, y: self.check_post(y, f'/condition?id={x}'))]
+ procs_data = [ # id, title, desc, effort,
+ # conditions, disables, blockers, enables
+ [1, 'foo', 'oof', 1.1, [1], [1], [2], [2]],
[2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]]
- procs_expected = self._post_batch(procs_data, cond_names,
- ['title', 'description', 'effort'],
- self.proc_as_dict, self.post_process)
+ procs_expected = []
+ for p_data in procs_data:
+ procs_expected += [self._post_args_return_expectation(
+ p_data,
+ ['conditions', 'disables', 'blockers', 'enables'],
+ ['title', 'description', 'effort'],
+ self.proc_as_dict, self.post_process)]
expected = self.GET_day_dict(date)
assert isinstance(expected['_library'], dict)
expected['processes'] = self.as_id_list(procs_expected)
# add Todos in relation to Conditions, check consequences
post_day: dict[str, object]
post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
- todos = [self._todo_as_dict(1, 1, date, [1], [1], [2], [2]),
- self._todo_as_dict(2, 2, date, [2], [2], [1], [1])]
+ todos = [ # id, process_id, date, conds, disables, blockers, enables
+ self.todo_as_dict(1, 1, date, [1], [1], [2], [2]),
+ self.todo_as_dict(2, 2, date, [2], [2], [1], [1])]
expected['_library']['Todo'] = self.as_refs(todos)
expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
# check illegal date range delimiters
self.check_get('/calendar?start=foo', 400)
self.check_get('/calendar?end=foo', 400)
- # check default range without saved days
+ # check default range for expected selection/order without saved days
expected = self.GET_calendar_dict(-1, 366)
self.check_json_get('/calendar', expected)
self.check_json_get('/calendar?start=&end=', expected)
- # check named days as delimiters
+ # check with named days as delimiters
expected = self.GET_calendar_dict(-1, +1)
self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected)
# check zero-element range
expected = self.GET_calendar_dict(+1, 0)
self.check_json_get('/calendar?start=tomorrow&end=today', expected)
- # check saved day shows up in results with proven by its comment
+ # check saved day shows up in results, proven by its comment
post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''}
- date1 = date_in_n_days(-2)
- self._post_day(f'date={date1}', post_day)
- start_date = date_in_n_days(-5)
- end_date = date_in_n_days(+5)
+ date = self._testing_date_in_n_days(-2)
+ self._post_day(f'date={date}', post_day)
+ start_date = self._testing_date_in_n_days(-5)
+ end_date = self._testing_date_in_n_days(+5)
url = f'/calendar?start={start_date}&end={end_date}'
expected = self.GET_calendar_dict(-5, +5)
assert isinstance(expected['_library'], dict)
- expected['_library']['Day'][date1]['comment'] = post_day['day_comment']
+ expected['_library']['Day'][date]['comment'] = post_day['day_comment']
self.check_json_get(url, expected)
from unittest import TestCase
from tests.utils import TestCaseWithServer
from plomtask.http import InputsParser
-from plomtask.exceptions import BadFormatException
+from plomtask.exceptions import BadFormatException, NotFoundException
class TestsSansServer(TestCase):
self.assertEqual('', parser.get_str('foo'))
self.assertEqual('bar', parser.get_str('foo', 'bar'))
parser.strict = True
- with self.assertRaises(BadFormatException):
+ with self.assertRaises(NotFoundException):
parser.get_str('foo')
- with self.assertRaises(BadFormatException):
+ with self.assertRaises(NotFoundException):
parser.get_str('foo', 'bar')
parser = InputsParser({'foo': []}, False)
self.assertEqual('bar', parser.get_str('foo', 'bar'))
- with self.assertRaises(BadFormatException):
+ with self.assertRaises(NotFoundException):
InputsParser({'foo': []}, True).get_str('foo', 'bar')
for strictness in (False, True):
parser = InputsParser({'foo': ['baz']}, strictness)
def test_InputsParser_get_float(self) -> None:
"""Test InputsParser.get_float on strict and non-strict."""
for strictness in (False, True):
- with self.assertRaises(BadFormatException):
- InputsParser({}, strictness).get_float('foo')
- with self.assertRaises(BadFormatException):
- InputsParser({'foo': []}, strictness).get_float('foo')
with self.assertRaises(BadFormatException):
InputsParser({'foo': ['']}, strictness).get_float('foo')
with self.assertRaises(BadFormatException):
self.assertEqual(0.1, parser.get_float('foo'))
parser = InputsParser({'foo': ['1.23', '456']}, strictness)
self.assertEqual(1.23, parser.get_float('foo'))
+ if strictness:
+ with self.assertRaises(NotFoundException):
+ InputsParser({}, strictness).get_float('foo')
+ with self.assertRaises(NotFoundException):
+ InputsParser({'foo': []}, strictness).get_float('foo')
+ else:
+ with self.assertRaises(BadFormatException):
+ InputsParser({}, strictness).get_float('foo')
+ with self.assertRaises(BadFormatException):
+ InputsParser({'foo': []}, strictness).get_float('foo')
+
+ def test_InputsParser_get_float_or_none(self) -> None:
+ """Test InputsParser.get_float_or_none on strict and non-strict."""
+ for strictness in (False, True):
+ with self.assertRaises(BadFormatException):
+ InputsParser({'foo': ['bar']}, strictness).\
+ get_float_or_none('foo')
+ parser = InputsParser({'foo': ['']}, strictness)
+ self.assertEqual(None, parser.get_float_or_none('foo'))
+ parser = InputsParser({'foo': ['0']}, strictness)
+ self.assertEqual(0, parser.get_float_or_none('foo'))
+ parser = InputsParser({'foo': ['0.1']}, strictness)
+ self.assertEqual(0.1, parser.get_float_or_none('foo'))
+ parser = InputsParser({'foo': ['1.23', '456']}, strictness)
+ self.assertEqual(1.23, parser.get_float_or_none('foo'))
+ if strictness:
+ with self.assertRaises(NotFoundException):
+ InputsParser({}, strictness).get_float_or_none('foo')
+ with self.assertRaises(NotFoundException):
+ InputsParser({'foo': []}, strictness).get_float_or_none('foo')
+ else:
+ parser = InputsParser({}, strictness)
+ self.assertEqual(None, parser.get_float_or_none('foo'))
+ parser = InputsParser({'foo': []}, strictness)
+ self.assertEqual(None, parser.get_float_or_none('foo'))
+
+ def test_InputsParser_get_bool(self) -> None:
+ """Test InputsParser.get_all_str on strict and non-strict."""
+ for strictness in (False, True):
+ parser = InputsParser({}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'val': ['true']}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'val': ['True']}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'val': ['1']}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'val': ['foo']}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'foo': []}, strictness)
+ self.assertEqual(False, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['None']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['0']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['bar']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['bar', 'baz']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
+ parser = InputsParser({'foo': ['False']}, strictness)
+ self.assertEqual(True, parser.get_bool('foo'))
def test_InputsParser_get_all_str(self) -> None:
"""Test InputsParser.get_all_str on strict and non-strict."""
class TestsSansDB(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = Process
- versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '',
- 'effort': 1.0}
class TestsSansDBProcessStep(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = ProcessStep
- default_init_args = [2, 3, 4]
+ default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
+ 'parent_step_id': 4}
class TestsWithDB(TestCaseWithDB):
"""Module tests requiring DB setup."""
checked_class = Process
- test_versioneds = {'title': str, 'description': str, 'effort': float}
def three_processes(self) -> tuple[Process, Process, Process]:
"""Return three saved processes."""
set_1 = [c1, c2]
set_2 = [c2, c3]
set_3 = [c1, c3]
- p.set_conditions(self.db_conn, [c.id_ for c in set_1
- if isinstance(c.id_, int)])
- p.set_enables(self.db_conn, [c.id_ for c in set_2
- if isinstance(c.id_, int)])
- p.set_disables(self.db_conn, [c.id_ for c in set_3
- if isinstance(c.id_, int)])
+ conds = [c.id_ for c in set_1 if isinstance(c.id_, int)]
+ enables = [c.id_ for c in set_2 if isinstance(c.id_, int)]
+ disables = [c.id_ for c in set_3 if isinstance(c.id_, int)]
+ p.set_condition_relations(self.db_conn, conds, [], enables, disables)
p.save(self.db_conn)
return p, set_1, set_2, set_3
"""Test setting Process.conditions/enables/disables."""
p = Process(None)
p.save(self.db_conn)
- for target in ('conditions', 'enables', 'disables'):
- method = getattr(p, f'set_{target}')
+ targets = ['conditions', 'blockers', 'enables', 'disables']
+ for i, target in enumerate(targets):
c1, c2 = Condition(None), Condition(None)
c1.save(self.db_conn)
c2.save(self.db_conn)
assert isinstance(c1.id_, int)
assert isinstance(c2.id_, int)
- method(self.db_conn, [])
+ args: list[list[int]] = [[], [], [], []]
+ args[i] = []
+ p.set_condition_relations(self.db_conn, *args)
self.assertEqual(getattr(p, target), [])
- method(self.db_conn, [c1.id_])
+ args[i] = [c1.id_]
+ p.set_condition_relations(self.db_conn, *args)
self.assertEqual(getattr(p, target), [c1])
- method(self.db_conn, [c2.id_])
+ args[i] = [c2.id_]
+ p.set_condition_relations(self.db_conn, *args)
self.assertEqual(getattr(p, target), [c2])
- method(self.db_conn, [c1.id_, c2.id_])
+ args[i] = [c1.id_, c2.id_]
+ p.set_condition_relations(self.db_conn, *args)
self.assertEqual(getattr(p, target), [c1, c2])
def test_remove(self) -> None:
'_library': library}
return d
- @staticmethod
- def procstep_as_dict(id_: int,
- owner_id: int,
- step_process_id: int,
- parent_step_id: int | None = None
- ) -> dict[str, object]:
- """Return JSON of Process to expect."""
- return {'id': id_,
- 'owner_id': owner_id,
- 'step_process_id': step_process_id,
- 'parent_step_id': parent_step_id}
-
def test_GET_processes(self) -> None:
"""Test GET /processes."""
# pylint: disable=too-many-statements
"""Test Todos module."""
+from typing import Any
from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
from plomtask.todos import Todo, TodoNode
from plomtask.processes import Process, ProcessStep
class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
"""Tests requiring DB, but not server setup.
- NB: We subclass TestCaseSansDB too, to pull in its .test_id_validation,
- which for Todo wouldn't run without a DB being set up due to the need for
- Processes with set IDs.
+ NB: We subclass TestCaseSansDB too, to run any tests there that due to any
+ Todo requiring a _saved_ Process wouldn't run without a DB.
"""
checked_class = Todo
default_init_kwargs = {'process': None, 'is_done': False,
'date': '2024-01-01'}
- # solely used for TestCaseSansDB.test_id_setting
- default_init_args = [None, False, '2024-01-01']
def setUp(self) -> None:
super().setUp()
self.cond2 = Condition(None)
self.cond2.save(self.db_conn)
self.default_init_kwargs['process'] = self.proc
- self.default_init_args[0] = self.proc
def test_Todo_init(self) -> None:
"""Test creation of Todo and what they default to."""
process.save(self.db_conn)
assert isinstance(self.cond1.id_, int)
assert isinstance(self.cond2.id_, int)
- process.set_conditions(self.db_conn, [self.cond1.id_, self.cond2.id_])
- process.set_enables(self.db_conn, [self.cond1.id_])
- process.set_disables(self.db_conn, [self.cond2.id_])
+ process.set_condition_relations(self.db_conn,
+ [self.cond1.id_, self.cond2.id_], [],
+ [self.cond1.id_], [self.cond2.id_])
todo_no_id = Todo(None, process, False, self.date1)
self.assertEqual(todo_no_id.conditions, [self.cond1, self.cond2])
self.assertEqual(todo_no_id.enables, [self.cond1])
with self.assertRaises(BadFormatException):
self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
+ def test_Todo_by_date_range_with_limits(self) -> None:
+ """Test .by_date_range_with_limits."""
+ self.check_by_date_range_with_limits('day')
+
def test_Todo_on_conditions(self) -> None:
"""Test effect of Todos on Conditions."""
assert isinstance(self.cond1.id_, int)
assert isinstance(self.cond2.id_, int)
todo = Todo(None, self.proc, False, self.date1)
todo.save(self.db_conn)
- todo.set_enables(self.db_conn, [self.cond1.id_])
- todo.set_disables(self.db_conn, [self.cond2.id_])
+ todo.set_condition_relations(self.db_conn, [], [],
+ [self.cond1.id_], [self.cond2.id_])
todo.is_done = True
self.assertEqual(self.cond1.is_active, True)
self.assertEqual(self.cond2.is_active, False)
todo_1.is_done = True
todo_2.is_done = True
todo_2.is_done = False
- todo_2.set_conditions(self.db_conn, [self.cond1.id_])
+ todo_2.set_condition_relations(
+ self.db_conn, [self.cond1.id_], [], [], [])
with self.assertRaises(BadFormatException):
todo_2.is_done = True
self.cond1.is_active = True
def test_Todo_step_tree(self) -> None:
"""Test self-configuration of TodoStepsNode tree for Day view."""
+
+ def todo_node_as_dict(node: TodoNode) -> dict[str, object]:
+ return {'todo': node.todo.id_, 'seen': node.seen,
+ 'children': [todo_node_as_dict(c) for c in node.children]}
+
todo_1 = Todo(None, self.proc, False, self.date1)
todo_1.save(self.db_conn)
assert isinstance(todo_1.id_, int)
# test minimum
node_0 = TodoNode(todo_1, False, [])
- self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+ cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+ cmp_1_dict = todo_node_as_dict(node_0)
+ self.assertEqual(cmp_0_dict, cmp_1_dict)
# test non_emtpy seen_todo does something
node_0.seen = True
- self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict,
- node_0.as_dict)
+ cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree({todo_1.id_}))
+ cmp_1_dict = todo_node_as_dict(node_0)
+ self.assertEqual(cmp_0_dict, cmp_1_dict)
# test child shows up
todo_2 = Todo(None, self.proc, False, self.date1)
todo_2.save(self.db_conn)
node_2 = TodoNode(todo_2, False, [])
node_0.children = [node_2]
node_0.seen = False
- self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+ cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+ cmp_1_dict = todo_node_as_dict(node_0)
+ self.assertEqual(cmp_0_dict, cmp_1_dict)
# test child shows up with child
todo_3 = Todo(None, self.proc, False, self.date1)
todo_3.save(self.db_conn)
todo_2.add_child(todo_3)
node_3 = TodoNode(todo_3, False, [])
node_2.children = [node_3]
- self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+ cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+ cmp_1_dict = todo_node_as_dict(node_0)
+ self.assertEqual(cmp_0_dict, cmp_1_dict)
# test same todo can be child-ed multiple times at different locations
todo_1.add_child(todo_3)
node_4 = TodoNode(todo_3, True, [])
node_0.children += [node_4]
- self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+ cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+ cmp_1_dict = todo_node_as_dict(node_0)
+ self.assertEqual(cmp_0_dict, cmp_1_dict)
def test_Todo_create_with_children(self) -> None:
- """Test parenthood guaranteeds of Todo.create_with_children."""
+ """Test parenthood guarantees of Todo.create_with_children."""
assert isinstance(self.proc.id_, int)
proc2 = Process(None)
proc2.save(self.db_conn)
self.assertEqual(len(todo_3.children), 1)
self.assertEqual(todo_3.children[0].process, proc4)
- def test_Todo_remove(self) -> None:
- """Test removal."""
- todo_1 = Todo(None, self.proc, False, self.date1)
- todo_1.save(self.db_conn)
- assert todo_1.id_ is not None
- todo_0 = Todo(None, self.proc, False, self.date1)
- todo_0.save(self.db_conn)
- todo_0.add_child(todo_1)
- todo_2 = Todo(None, self.proc, False, self.date1)
- todo_2.save(self.db_conn)
- todo_1.add_child(todo_2)
- todo_1_id = todo_1.id_
- todo_1.remove(self.db_conn)
- with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, todo_1_id)
- self.assertEqual(todo_0.children, [])
- self.assertEqual(todo_2.parents, [])
- todo_2.comment = 'foo'
- with self.assertRaises(HandledException):
- todo_2.remove(self.db_conn)
- todo_2.comment = ''
- todo_2.effort = 5
- with self.assertRaises(HandledException):
- todo_2.remove(self.db_conn)
-
- def test_Todo_autoremoval(self) -> None:
- """"Test automatic removal for Todo.effort < 0."""
- todo_1 = Todo(None, self.proc, False, self.date1)
- todo_1.save(self.db_conn)
- todo_1.comment = 'foo'
- todo_1.effort = -0.1
- todo_1.save(self.db_conn)
- assert todo_1.id_ is not None
- Todo.by_id(self.db_conn, todo_1.id_)
- todo_1.comment = ''
- todo_1_id = todo_1.id_
- todo_1.save(self.db_conn)
- with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, todo_1_id)
-
class TestsWithServer(TestCaseWithServer):
"""Tests against our HTTP server/handler (and database)."""
- def test_do_POST_day(self) -> None:
- """Test Todo posting of POST /day."""
- self.post_process()
- self.post_process(2)
- proc = Process.by_id(self.db_conn, 1)
- proc2 = Process.by_id(self.db_conn, 2)
- form_data = {'day_comment': '', 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), [])
- proc = Process.by_id(self.db_conn, 1)
- form_data['new_todo'] = str(proc.id_)
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todos = Todo.by_date(self.db_conn, '2024-01-01')
- self.assertEqual(1, len(todos))
- todo1 = todos[0]
- self.assertEqual(todo1.id_, 1)
- proc = Process.by_id(self.db_conn, 1)
- self.assertEqual(todo1.process.id_, proc.id_)
- self.assertEqual(todo1.is_done, False)
- proc2 = Process.by_id(self.db_conn, 2)
- form_data['new_todo'] = str(proc2.id_)
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todos = Todo.by_date(self.db_conn, '2024-01-01')
- todo1 = todos[1]
- self.assertEqual(todo1.id_, 2)
- proc2 = Process.by_id(self.db_conn, 1)
- todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
- self.assertEqual(todo1.id_, 1)
- self.assertEqual(todo1.process.id_, proc2.id_)
- self.assertEqual(todo1.is_done, False)
-
- def test_do_POST_todo(self) -> None:
- """Test POST /todo."""
- def post_and_reload(form_data: dict[str, object], status: int = 302,
- redir_url: str = '/todo?id=1') -> Todo:
- self.check_post(form_data, '/todo?id=1', status, redir_url)
- return Todo.by_date(self.db_conn, '2024-01-01')[0]
- # test minimum
- self.post_process()
- self.check_post({'day_comment': '', 'new_todo': 1,
- 'make_type': 'full'},
- '/day?date=2024-01-01&make_type=full', 302)
- # test posting to bad URLs
- self.check_post({}, '/todo=', 404)
- self.check_post({}, '/todo?id=', 404)
+ def setUp(self) -> None:
+ super().setUp()
+ self._proc1_form_data: Any = self.post_process(1)
+ self._date = '2024-01-01'
+
+ @classmethod
+ def GET_todo_dict(cls,
+ target_id: int,
+ todos: list[dict[str, object]],
+ processes: list[dict[str, object]],
+ process_steps: list[dict[str, object]] | None = None,
+ conditions: list[dict[str, object]] | None = None
+ ) -> dict[str, object]:
+ """Return JSON of GET /todo to expect."""
+ # pylint: disable=too-many-arguments
+ library = {'Todo': cls.as_refs(todos),
+ 'Process': cls.as_refs(processes)}
+ if process_steps:
+ library['ProcessStep'] = cls.as_refs(process_steps)
+ conditions = conditions if conditions else []
+ if conditions:
+ library['Condition'] = cls.as_refs(conditions)
+ return {'todo': target_id,
+ 'steps_todo_to_process': [],
+ 'adoption_candidates_for': {},
+ 'process_candidates': [p['id'] for p in processes],
+ 'todo_candidates': [t['id'] for t in todos
+ if t['id'] != target_id],
+ 'condition_candidates': [c['id'] for c in conditions],
+ '_library': library}
+
+ @staticmethod
+ def _step_as_dict(node_id: int,
+ children: list[dict[str, object]],
+ process: int | None = None,
+ todo: int | None = None,
+ fillable: bool = False,
+ ) -> dict[str, object]:
+ return {'node_id': node_id,
+ 'children': children,
+ 'process': process,
+ 'fillable': fillable,
+ 'todo': todo}
+
+ def _make_todo_via_day_post(self, proc_id: int) -> None:
+ payload = {'day_comment': '',
+ 'new_todo': proc_id,
+ 'make_type': 'empty'}
+ self.check_post(payload, f'/day?date={self._date}&make_type=empty')
+
+ def test_basic_fail_POST_todo(self) -> None:
+ """Test basic malformed/illegal POST /todo requests."""
+ # test we cannot just POST into non-existing Todo
+ self.check_post({}, '/todo', 404)
self.check_post({}, '/todo?id=FOO', 400)
self.check_post({}, '/todo?id=0', 404)
- # test posting naked entity
- todo1 = post_and_reload({})
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo1.is_done, False)
- # test posting doneness
- todo1 = post_and_reload({'done': ''})
- self.assertEqual(todo1.is_done, True)
- # test implicitly posting non-doneness
- todo1 = post_and_reload({})
- self.assertEqual(todo1.is_done, False)
- # test malformed adoptions
- self.check_post({'adopt': 'foo'}, '/todo?id=1', 400)
+ self.check_post({}, '/todo?id=1', 404)
+ # test malformed values on existing Todo
+ self._make_todo_via_day_post(1)
+ for name in [
+ 'adopt', 'effort', 'make_full', 'make_empty', 'step_filler',
+ 'conditions', 'disables', 'blockers', 'enables']:
+ self.check_post({name: 'x'}, '/todo?id=1', 400, '/todo')
+ for prefix in ['make_empty_', 'make_full_']:
+ for suffix in ['', 'x', '1.1']:
+ self.check_post({'step_filler': f'{prefix}{suffix}'},
+ '/todo?id=1', 400, '/todo')
+
+ def test_basic_POST_todo(self) -> None:
+ """Test basic POST /todo manipulations."""
+ self._make_todo_via_day_post(1)
+ # test posting naked entity at first changes nothing
+ todo_dict = self.todo_as_dict(1, 1)
+ proc_dict = self.proc_as_dict(**self._proc1_form_data)
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ self.check_json_get('/todo?id=1', expected)
+ self.check_post({}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test posting doneness, comment, calendarization, effort
+ todo_post = {'done': '', 'calendarize': '', 'comment': 'foo',
+ 'effort': 2.3}
+ todo_dict = self.todo_as_dict(1, 1, is_done=True, calendarize=True,
+ comment='foo', effort=2.3)
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ self.check_post(todo_post, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test implicitly un-setting all of those except effort by empty post
+ self.check_post({}, '/todo?id=1')
+ todo_dict = self.todo_as_dict(1, 1, effort=2.3)
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ self.check_json_get('/todo?id=1', expected)
+ # test empty effort post can be explicitly unset by "" post
+ self.check_post({'effort': ''}, '/todo?id=1')
+ todo_dict['effort'] = None
+ self.check_json_get('/todo?id=1', expected)
+ # test Condition posts
+ c1_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
+ c2_post = {'title': 'bar', 'description': 'rab', 'is_active': True}
+ self.check_post(c1_post, '/condition', redir='/condition?id=1')
+ self.check_post(c2_post, '/condition', redir='/condition?id=2')
+ c1_dict = self.cond_as_dict(1, False, ['foo'], ['oof'])
+ c2_dict = self.cond_as_dict(2, True, ['bar'], ['rab'])
+ conditions = [c1_dict, c2_dict]
+ todo_post = {'conditions': [1], 'disables': [1],
+ 'blockers': [2], 'enables': [2]}
+ for k, v in todo_post.items():
+ todo_dict[k] = v
+ self.check_post(todo_post, '/todo?id=1')
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict],
+ conditions=conditions)
+ self.check_json_get('/todo?id=1', expected)
+
+ def test_POST_todo_deletion(self) -> None:
+ """Test deletions via POST /todo."""
+ self._make_todo_via_day_post(1)
+ todo_dict = self.todo_as_dict(1, process_id=1)
+ proc_dict = self.proc_as_dict(**self._proc1_form_data)
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ # test failure of deletion on non-existing Todo
+ self.check_post({'delete': ''}, '/todo?id=2', 404, '/')
+ # test deletion of existing Todo
+ self.check_post({'delete': ''}, '/todo?id=1', 302, '/')
+ self.check_get('/todo?id=1', 404)
+ # test deletion of adopted Todo
+ self._make_todo_via_day_post(1)
+ self._make_todo_via_day_post(1)
+ self.check_post({'adopt': 2}, '/todo?id=1')
+ self.check_post({'delete': ''}, '/todo?id=2', 302, '/')
+ self.check_json_get('/todo?id=1', expected)
+ # test deletion of adopting Todo
+ self._make_todo_via_day_post(1)
+ self.check_post({'adopt': 2}, '/todo?id=1')
+ self.check_post({'delete': ''}, '/todo?id=1', 302, '/')
+ todo_dict['id'] = 2
+ expected = self.GET_todo_dict(2, [todo_dict], [proc_dict])
+ self.check_json_get('/todo?id=2', expected)
+ # test cannot delete Todo with comment or effort
+ self.check_post({'comment': 'foo'}, '/todo?id=2')
+ self.check_post({'delete': ''}, '/todo?id=2', 500, '/')
+ self.check_post({'effort': 5}, '/todo?id=2')
+ self.check_post({'delete': ''}, '/todo?id=2', 500, '/')
+ # test deletion via effort < 0, but only once deletable
+ self.check_post({'effort': -1, 'comment': 'foo'}, '/todo?id=2')
+ todo_dict['comment'] = 'foo'
+ todo_dict['effort'] = -1
+ self.check_json_get('/todo?id=2', expected)
+ self.check_post({}, '/todo?id=2')
+ self.check_get('/todo?id=2', 404)
+
+ def test_POST_todo_adoption(self) -> None:
+ """Test adoption via POST /todo with "adopt"."""
+ # pylint: disable=too-many-locals
+ # pylint: disable=too-many-statements
+ # post two Todos to Day, have first adopt second
+ self._make_todo_via_day_post(1)
+ self._make_todo_via_day_post(1)
+ proc1_dict = self.proc_as_dict(**self._proc1_form_data)
+ todo1_dict = self.todo_as_dict(1, process_id=1, children=[2])
+ todo2_dict = self.todo_as_dict(2, process_id=1, parents=[1])
+ todos = [todo1_dict, todo2_dict]
+ expected = self.GET_todo_dict(1, todos, [proc1_dict])
+ expected['steps_todo_to_process'] = [self._step_as_dict(1, [], todo=2)]
+ self.check_post({'adopt': 2}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test Todo un-adopting by just not sending an adopt
+ self.check_post({}, '/todo?id=1')
+ todo1_dict['children'] = []
+ todo2_dict['parents'] = []
+ expected['steps_todo_to_process'] = []
+ self.check_json_get('/todo?id=1', expected)
+ # test fail on trying to adopt non-existing Todo
+ self.check_post({'adopt': 3}, '/todo?id=1', 404)
+ # test cannot self-adopt
self.check_post({'adopt': 1}, '/todo?id=1', 400)
- self.check_post({'adopt': 2}, '/todo?id=1', 404)
- # test posting second todo of same process
- self.check_post({'day_comment': '', 'new_todo': 1,
- 'make_type': 'full'},
- '/day?date=2024-01-01&make_type=full', 302)
- # test todo 1 adopting todo 2
- todo1 = post_and_reload({'adopt': 2})
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [todo2])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo2.children, [])
- self.assertEqual(todo2.parents, [todo1])
- # test todo1 cannot be set done with todo2 not done yet
- todo1 = post_and_reload({'done': '', 'adopt': 2}, 400)
- self.assertEqual(todo1.is_done, False)
- # test todo1 un-adopting todo 2 by just not sending an adopt
- todo1 = post_and_reload({}, 302)
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo2.children, [])
- self.assertEqual(todo2.parents, [])
- # test todo1 deletion
- todo1 = post_and_reload({'delete': ''}, 302, '/')
+ # test cannot do 1-step circular adoption
+ self.check_post({'adopt': 1}, '/todo?id=2')
+ todo1_dict['parents'] = [2]
+ todo2_dict['children'] = [1]
+ self.check_post({'adopt': 2}, '/todo?id=1', 400)
+ # test cannot do 2-step circular adoption
+ self._make_todo_via_day_post(1)
+ self.check_post({'adopt': 2}, '/todo?id=3')
+ todo3_dict = self.todo_as_dict(3, process_id=1, children=[2])
+ todo2_dict['parents'] = [3]
+ todos += [todo3_dict]
+ self.check_post({'adopt': 3}, '/todo?id=1', 400)
+ # test can adopt Todo into ProcessStep chain via its Process (with key
+ # 'step_filler' equivalent to single-element 'adopt' if intable)
+ proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
+ self.post_process(3, proc_post)
+ self.post_process(2, proc_post)
+ self.post_process(1, self._proc1_form_data | {'new_top_step': [2, 3]})
+ self._make_todo_via_day_post(2)
+ self._make_todo_via_day_post(3)
+ self.check_post({'step_filler': 5, 'adopt': [4]}, '/todo?id=1')
+ proc3_dict = self.proc_as_dict(3)
+ proc2_dict = self.proc_as_dict(2)
+ proc1_dict['explicit_steps'] = [1, 2]
+ procs = [proc1_dict, proc2_dict, proc3_dict]
+ procsteps = [self.procstep_as_dict(1, 1, 2),
+ self.procstep_as_dict(2, 1, 3)]
+ todo1_dict['children'] = [4, 5]
+ todo4_dict = self.todo_as_dict(4, process_id=2, parents=[1])
+ todo5_dict = self.todo_as_dict(5, process_id=3, parents=[1])
+ todos += [todo4_dict, todo5_dict]
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ step_proc2 = self._step_as_dict(1, [], 2, 4, True)
+ step_proc3 = self._step_as_dict(2, [], 3, 5, True)
+ expected['steps_todo_to_process'] = [step_proc2, step_proc3]
+ self.check_json_get('/todo?id=1', expected)
+ # test 'ignore' values for 'step_filler' are ignored, and intable
+ # 'step_filler' values are interchangeable with those of 'adopt'
+ todo_post = {'adopt': 5, 'step_filler': ['ignore', 4]}
+ self.check_post(todo_post, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test cannot adopt into non-top-level elements of chain
+ self.post_process(4, proc_post)
+ self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [1]})
+ proc4_dict = self.proc_as_dict(4)
+ proc3_dict['explicit_steps'] = [3]
+ procs += [proc4_dict]
+ procsteps += [self.procstep_as_dict(3, 3, 4)]
+ step_proc4 = self._step_as_dict(3, [], 4, None, True)
+ step_proc3['children'] = [step_proc4]
+ self._make_todo_via_day_post(4)
+ self.check_post({'adopt': [4, 5, 6]}, '/todo?id=1')
+ todo6_dict = self.todo_as_dict(6, process_id=4, parents=[1])
+ todo1_dict['children'] = [4, 5, 6]
+ todos += [todo6_dict]
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ step2_proc4 = self._step_as_dict(4, [], None, 6, False)
+ expected['steps_todo_to_process'] = [step_proc2, step_proc3,
+ step2_proc4]
+ expected['adoption_candidates_for'] = {'4': [6]}
+ self.check_json_get('/todo?id=1', expected)
+
+ def test_POST_todo_make_full(self) -> None:
+ """Test creation and adoption via POST /todo with "make_full"."""
+ # pylint: disable=too-many-locals
+ # create chain of Processes
+ proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
+ self.post_process(2, proc_post | {'new_top_step': 1})
+ self.post_process(3, proc_post | {'new_top_step': 2})
+ self.post_process(4, proc_post | {'new_top_step': 3})
+ proc1_dict = self.proc_as_dict(**self._proc1_form_data)
+ proc2_dict = self.proc_as_dict(2, explicit_steps=[1])
+ proc3_dict = self.proc_as_dict(3, explicit_steps=[2])
+ proc4_dict = self.proc_as_dict(4, explicit_steps=[3])
+ procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict]
+ procsteps = [self.procstep_as_dict(1, 2, 1),
+ self.procstep_as_dict(2, 3, 2),
+ self.procstep_as_dict(3, 4, 3)]
+ # post (childless) Todo of chain end, then make_full on next in line
+ self._make_todo_via_day_post(4)
+ todo1_dict = self.todo_as_dict(1, 4, children=[2])
+ todo2_dict = self.todo_as_dict(2, 3, children=[3], parents=[1])
+ todo3_dict = self.todo_as_dict(3, 2, parents=[2], children=[4])
+ todo4_dict = self.todo_as_dict(4, 1, parents=[3])
+ todos = [todo1_dict, todo2_dict, todo3_dict, todo4_dict]
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ step_proc1 = self._step_as_dict(3, [], 1, 4, True)
+ step_proc2 = self._step_as_dict(2, [step_proc1], 2, 3, True)
+ step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True)
+ expected['steps_todo_to_process'] = [step_proc3]
+ self.check_post({'step_filler': 'make_full_3'}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # make new chain next to expected, find steps_todo_to_process extended,
+ # expect existing Todo demanded by new chain be adopted into new chain
+ self.check_post({'make_full': 2, 'adopt': [2]}, '/todo?id=1')
+ todo5_dict = self.todo_as_dict(5, 2, parents=[1], children=[4])
+ todo1_dict['children'] = [2, 5]
+ todo4_dict['parents'] = [3, 5]
+ todos += [todo5_dict]
+ step2_proc1 = self._step_as_dict(5, [], None, 4)
+ step2_proc2 = self._step_as_dict(4, [step2_proc1], None, 5)
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ expected['steps_todo_to_process'] = [step_proc3, step2_proc2]
+ self.check_json_get('/todo?id=1', expected)
+ # fail on trying to call make_full on non-existing Process
+ self.check_post({'make_full': 5}, '/todo?id=1', 404)
+
+ def test_POST_todo_make_empty(self) -> None:
+ """Test creation and adoption via POST /todo with "make_empty"."""
+ # pylint: disable=too-many-locals
+ # create chain of Processes
+ proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
+ self.post_process(2, proc_post | {'new_top_step': 1})
+ self.post_process(3, proc_post | {'new_top_step': 2})
+ self.post_process(4, proc_post | {'new_top_step': 3})
+ proc1_dict = self.proc_as_dict(**self._proc1_form_data)
+ proc2_dict = self.proc_as_dict(2, explicit_steps=[1])
+ proc3_dict = self.proc_as_dict(3, explicit_steps=[2])
+ proc4_dict = self.proc_as_dict(4, explicit_steps=[3])
+ procs = [proc1_dict, proc2_dict, proc3_dict, proc4_dict]
+ procsteps = [self.procstep_as_dict(1, 2, 1),
+ self.procstep_as_dict(2, 3, 2),
+ self.procstep_as_dict(3, 4, 3)]
+ # post (childless) Todo of chain end, then make empty on next in line
+ self._make_todo_via_day_post(4)
+ todo1_dict = self.todo_as_dict(1, 4, children=[2])
+ todo2_dict = self.todo_as_dict(2, 3, parents=[1])
+ todos = [todo1_dict, todo2_dict]
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ step_proc1 = self._step_as_dict(3, [], 1, None)
+ step_proc2 = self._step_as_dict(2, [step_proc1], 2, None, True)
+ step_proc3 = self._step_as_dict(1, [step_proc2], 3, 2, True)
+ expected['steps_todo_to_process'] = [step_proc3]
+ expected['adoption_candidates_for'] = {'1': [], '2': []}
+ self.check_post({'step_filler': 'make_empty_3'}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # make new top-level Todo without chain implied by its Process
+ self.check_post({'make_empty': 2, 'adopt': [2]}, '/todo?id=1')
+ todo3_dict = self.todo_as_dict(3, 2, parents=[1], children=[])
+ todo1_dict['children'] = [2, 3]
+ todos += [todo3_dict]
+ step2_proc2 = self._step_as_dict(4, [], None, 3)
+ expected = self.GET_todo_dict(1, todos, procs, procsteps)
+ expected['steps_todo_to_process'] = [step_proc3, step2_proc2]
+ expected['adoption_candidates_for'] = {'1': [], '2': [3]}
+ self.check_json_get('/todo?id=1', expected)
+ # fail on trying to call make_empty on non-existing Process
+ self.check_post({'make_full': 5}, '/todo?id=1', 404)
+
+ def test_do_GET_todo(self) -> None:
+ """Test GET /todo response codes."""
+ self._make_todo_via_day_post(1)
+ # test malformed or illegal parameter values
+ self.check_get('/todo', 404)
+ self.check_get('/todo?id=', 404)
+ self.check_get('/todo?id=foo', 400)
+ self.check_get('/todo?id=0', 404)
+ self.check_get('/todo?id=2', 404)
+ # test all existing Processes are shown as available
+ proc_post = {'title': 'A', 'description': '', 'effort': 1.0}
+ self.post_process(2, proc_post)
+ todo1_dict = self.todo_as_dict(1, process_id=1)
+ proc1_dict = self.proc_as_dict(1, **self._proc1_form_data)
+ proc2_dict = self.proc_as_dict(2)
+ procs = [proc1_dict, proc2_dict]
+ expected = self.GET_todo_dict(1, [todo1_dict], procs)
+ self.check_json_get('/todo?id=1', expected)
+ # test chain of Processes shown as potential step nodes
+ self.post_process(2, proc_post)
+ self.post_process(3, proc_post)
+ self.post_process(4, proc_post)
+ self.post_process(1, self._proc1_form_data | {'new_top_step': 2})
+ self.post_process(2, proc_post | {'new_top_step': 3, 'step_of': [1]})
+ self.post_process(3, proc_post | {'new_top_step': 4, 'step_of': [2]})
+ proc1_dict['explicit_steps'] = [1]
+ proc2_dict['explicit_steps'] = [2]
+ proc3_dict = self.proc_as_dict(3, explicit_steps=[3])
+ proc4_dict = self.proc_as_dict(4)
+ procs += [proc3_dict, proc4_dict]
+ procsteps = [self.procstep_as_dict(1, 1, 2, None),
+ self.procstep_as_dict(2, 2, 3, None),
+ self.procstep_as_dict(3, 3, 4, None)]
+ expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps)
+ step_proc4 = self._step_as_dict(3, [], 4)
+ step_proc3 = self._step_as_dict(2, [step_proc4], 3)
+ step_proc2 = self._step_as_dict(1, [step_proc3], 2, fillable=True)
+ expected['steps_todo_to_process'] = [step_proc2]
+ expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []}
+ self.check_json_get('/todo?id=1', expected)
+ # test display of parallel chains
+ proc_steps_post = {'new_top_step': 4, 'keep_step': [1],
+ 'step_1_process_id': 2, 'steps': [1, 4]}
+ self.post_process(1, self._proc1_form_data | proc_steps_post)
+ proc1_dict['explicit_steps'] = [1, 4]
+ step2_proc4 = self._step_as_dict(4, [], 4, fillable=True)
+ procsteps += [self.procstep_as_dict(4, 1, 4, None)]
+ expected = self.GET_todo_dict(1, [todo1_dict], procs, procsteps)
+ expected['steps_todo_to_process'] = [step_proc2, step2_proc4]
+ expected['adoption_candidates_for'] = {'2': [], '3': [], '4': []}
+ self.check_json_get('/todo?id=1', expected)
+
+ def test_do_POST_doneness_relations(self) -> None:
+ """Test Todo.is_done Condition, adoption relations for /todo POSTs."""
+ # test Todo with adoptee can only be set done if adoptee is done too
+ self._make_todo_via_day_post(1)
+ self._make_todo_via_day_post(1)
+ self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 400)
+ self.check_post({'done': ''}, '/todo?id=2')
+ self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 302)
+ # test Todo cannot be set undone with adopted Todo not done yet
+ self.check_post({}, '/todo?id=2')
+ self.check_post({'adopt': 2}, '/todo?id=1', 400)
+ # test unadoption relieves block
+ self.check_post({}, '/todo?id=1', 302)
+ # test Condition being set or unset can block doneness setting
+ c1_post = {'title': '', 'description': '', 'is_active': False}
+ c2_post = {'title': '', 'description': '', 'is_active': True}
+ self.check_post(c1_post, '/condition', redir='/condition?id=1')
+ self.check_post(c2_post, '/condition', redir='/condition?id=2')
+ self.check_post({'conditions': [1], 'done': ''}, '/todo?id=1', 400)
+ self.check_post({'done': ''}, '/todo?id=1', 302)
+ self.check_post({'blockers': [2]}, '/todo?id=1', 400)
+ self.check_post({'done': ''}, '/todo?id=1', 302)
+ # test setting Todo doneness can set/un-set Conditions, but only on
+ # doneness change, not by mere passive state
+ self.check_post({'enables': [1], 'done': ''}, '/todo?id=1')
+ self.check_post({'conditions': [1], 'done': ''}, '/todo?id=2', 400)
+ self.check_post({'enables': [1]}, '/todo?id=1')
+ self.check_post({'enables': [1], 'done': ''}, '/todo?id=1')
+ self.check_post({'conditions': [1], 'done': ''}, '/todo?id=2')
+ self.check_post({'blockers': [1]}, '/todo?id=2', 400)
+ self.check_post({'disables': [1], 'done': ''}, '/todo?id=1')
+ self.check_post({'blockers': [1]}, '/todo?id=2', 400)
+ self.check_post({'disables': [1]}, '/todo?id=1')
+ self.check_post({'disables': [1], 'done': ''}, '/todo?id=1')
+ self.check_post({'blockers': [1]}, '/todo?id=2')
def test_do_POST_day_todo_adoption(self) -> None:
"""Test Todos posted to Day view may adopt existing Todos."""
- form_data = self.post_process()
- form_data = self.post_process(2, form_data | {'new_top_step': 1})
+ form_data = self.post_process(
+ 2, self._proc1_form_data | {'new_top_step': 1})
form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
form_data['new_todo'] = 2
self.assertEqual(todo2.children, [todo1])
self.assertEqual(todo2.parents, [])
- def test_do_POST_day_todo_multiple(self) -> None:
- """Test multiple Todos can be posted to Day view."""
- form_data = self.post_process()
- form_data = self.post_process(2)
- form_data = {'day_comment': '', 'new_todo': [1, 2],
- 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.process.id_, 1)
- self.assertEqual(todo2.process.id_, 2)
-
def test_do_POST_day_todo_multiple_inner_adoption(self) -> None:
"""Test multiple Todos can be posted to Day view w. inner adoption."""
self.assertEqual(todo3.children, [])
self.assertEqual(sorted(todo3.parents), sorted([todo2, todo1]))
- form_data = self.post_process()
- form_data = self.post_process(2, form_data | {'new_top_step': 1})
+ self.post_process(2, self._proc1_form_data | {'new_top_step': 1})
check_adoption('2024-01-01', [1, 2])
check_adoption('2024-01-02', [2, 1])
check_nesting_adoption(3, '2024-01-03', [1, 2])
def test_do_POST_day_todo_doneness(self) -> None:
"""Test Todo doneness can be posted to Day view."""
- self.post_process()
form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
self.assertEqual(todo.is_done, True)
-
- def test_do_GET_todo(self) -> None:
- """Test GET /todo response codes."""
- self.post_process()
- form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- self.check_get('/todo', 404)
- self.check_get('/todo?id=', 404)
- self.check_get('/todo?id=foo', 400)
- self.check_get('/todo?id=0', 404)
- self.check_get('/todo?id=1', 200)
from typing import Mapping, Any, Callable
from threading import Thread
from http.client import HTTPConnection
+from datetime import datetime, timedelta
+from time import sleep
from json import loads as json_loads
from urllib.parse import urlencode
from uuid import uuid4
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
from plomtask.days import Day
+from plomtask.dating import DATE_FORMAT
from plomtask.todos import Todo
+from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
from plomtask.exceptions import NotFoundException, HandledException
-def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]:
- def wrapper(self: TestCase) -> None:
- if hasattr(self, 'checked_class'):
- f(self)
- return wrapper
+VERSIONED_VALS: dict[str,
+ list[str] | list[float]] = {'str': ['A', 'B'],
+ 'float': [0.3, 1.1]}
-class TestCaseSansDB(TestCase):
- """Tests requiring no DB setup."""
+class TestCaseAugmented(TestCase):
+ """Tester core providing helpful basic internal decorators and methods."""
checked_class: Any
- default_init_args: list[Any] = []
- versioned_defaults_to_test: dict[str, str | float] = {}
- legal_ids = [1, 5]
- illegal_ids = [0]
+ default_init_kwargs: dict[str, Any] = {}
+
+ @staticmethod
+ def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]:
+ def wrapper(self: TestCase) -> None:
+ if hasattr(self, 'checked_class'):
+ f(self)
+ return wrapper
+
+ @classmethod
+ def _run_on_versioned_attributes(cls,
+ f: Callable[..., None]
+ ) -> Callable[..., None]:
+ @cls._run_if_checked_class
+ def wrapper(self: TestCase) -> None:
+ assert isinstance(self, TestCaseAugmented)
+ for attr_name in self.checked_class.to_save_versioned():
+ default = self.checked_class.versioned_defaults[attr_name]
+ owner = self.checked_class(None, **self.default_init_kwargs)
+ attr = getattr(owner, attr_name)
+ to_set = VERSIONED_VALS[attr.value_type_name]
+ f(self, owner, attr_name, attr, default, to_set)
+ return wrapper
+
+ @classmethod
+ def _make_from_defaults(cls, id_: float | str | None) -> Any:
+ return cls.checked_class(id_, **cls.default_init_kwargs)
+
+
+class TestCaseSansDB(TestCaseAugmented):
+ """Tests requiring no DB setup."""
+ legal_ids: list[str] | list[int] = [1, 5]
+ illegal_ids: list[str] | list[int] = [0]
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_id_validation(self) -> None:
"""Test .id_ validation/setting."""
for id_ in self.illegal_ids:
with self.assertRaises(HandledException):
- self.checked_class(id_, *self.default_init_args)
+ self._make_from_defaults(id_)
for id_ in self.legal_ids:
- obj = self.checked_class(id_, *self.default_init_args)
+ obj = self._make_from_defaults(id_)
self.assertEqual(obj.id_, id_)
- @_within_checked_class
- def test_versioned_defaults(self) -> None:
- """Test defaults of VersionedAttributes."""
- id_ = self.legal_ids[0]
- obj = self.checked_class(id_, *self.default_init_args)
- for k, v in self.versioned_defaults_to_test.items():
- self.assertEqual(getattr(obj, k).newest, v)
-
-
-class TestCaseWithDB(TestCase):
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_set(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test VersionedAttribute.set() behaves as expected."""
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ # check same value does not get set twice in a row,
+ # and that not even its timestamp get updated
+ timestamp = list(attr.history.keys())[0]
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ self.assertEqual(list(attr.history.keys())[0], timestamp)
+ # check that different value _will_ be set/added
+ attr.set(to_set[0])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0]]
+ self.assertEqual(timesorted_vals, expected)
+ # check that a previously used value can be set if not most recent
+ attr.set(default)
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default]
+ self.assertEqual(timesorted_vals, expected)
+ # again check for same value not being set twice in a row, even for
+ # later items
+ attr.set(to_set[1])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default, to_set[1]]
+ self.assertEqual(timesorted_vals, expected)
+ attr.set(to_set[1])
+ self.assertEqual(timesorted_vals, expected)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_newest(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test VersionedAttribute.newest."""
+ # check .newest on empty history returns .default
+ self.assertEqual(attr.newest, default)
+ # check newest element always returned
+ for v in [to_set[0], to_set[1]]:
+ attr.set(v)
+ self.assertEqual(attr.newest, v)
+ # check newest element returned even if also early value
+ attr.set(default)
+ self.assertEqual(attr.newest, default)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_at(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test .at() returns values nearest to queried time, or default."""
+ # check .at() return default on empty history
+ timestamp_a = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_a), default)
+ # check value exactly at timestamp returned
+ attr.set(to_set[0])
+ timestamp_b = list(attr.history.keys())[0]
+ self.assertEqual(attr.at(timestamp_b), to_set[0])
+ # check earliest value returned if exists, rather than default
+ self.assertEqual(attr.at(timestamp_a), to_set[0])
+ # check reverts to previous value for timestamps not indexed
+ sleep(0.00001)
+ timestamp_between = datetime.now().strftime(TIMESTAMP_FMT)
+ sleep(0.00001)
+ attr.set(to_set[1])
+ timestamp_c = sorted(attr.history.keys())[-1]
+ self.assertEqual(attr.at(timestamp_c), to_set[1])
+ self.assertEqual(attr.at(timestamp_between), to_set[0])
+ sleep(0.00001)
+ timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_after_c), to_set[1])
+
+
+class TestCaseWithDB(TestCaseAugmented):
"""Module tests not requiring DB setup."""
- checked_class: Any
- default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
- default_init_kwargs: dict[str, Any] = {}
- test_versioneds: dict[str, type] = {}
+ default_ids: tuple[int, int, int] | tuple[str, str, str] = (1, 2, 3)
def setUp(self) -> None:
Condition.empty_cache()
return db_found
def _change_obj(self, obj: object) -> str:
- attr_name: str = self.checked_class.to_save[-1]
+ attr_name: str = self.checked_class.to_save_simples[-1]
attr = getattr(obj, attr_name)
new_attr: str | int | float | bool
if isinstance(attr, (int, float)):
hashes_db_found = [hash(x) for x in db_found]
self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
- @_within_checked_class
- def test_saving_versioned(self) -> None:
+ def check_by_date_range_with_limits(self,
+ date_col: str,
+ set_id_field: bool = True
+ ) -> None:
+ """Test .by_date_range_with_limits."""
+ # pylint: disable=too-many-locals
+ f = self.checked_class.by_date_range_with_limits
+ # check illegal ranges
+ legal_range = ('yesterday', 'tomorrow')
+ for i in [0, 1]:
+ for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']:
+ date_range = list(legal_range[:])
+ date_range[i] = bad_date
+ with self.assertRaises(HandledException):
+ f(self.db_conn, date_range, date_col)
+ # check empty, translation of 'yesterday' and 'tomorrow'
+ items, start, end = f(self.db_conn, legal_range, date_col)
+ self.assertEqual(items, [])
+ yesterday = datetime.now() + timedelta(days=-1)
+ tomorrow = datetime.now() + timedelta(days=+1)
+ self.assertEqual(start, yesterday.strftime(DATE_FORMAT))
+ self.assertEqual(end, tomorrow.strftime(DATE_FORMAT))
+ # prepare dated items for non-empty results
+ kwargs_with_date = self.default_init_kwargs.copy()
+ if set_id_field:
+ kwargs_with_date['id_'] = None
+ objs = []
+ dates = ['2024-01-01', '2024-01-02', '2024-01-04']
+ for date in ['2024-01-01', '2024-01-02', '2024-01-04']:
+ kwargs_with_date['date'] = date
+ obj = self.checked_class(**kwargs_with_date)
+ objs += [obj]
+ # check ranges still empty before saving
+ date_range = [dates[0], dates[-1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check all objs displayed within closed interval
+ for obj in objs:
+ obj.save(self.db_conn)
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs)
+ # check that only displayed what exists within interval
+ date_range = ['2023-12-20', '2024-01-03']
+ expected = [objs[0], objs[1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ date_range = ['2024-01-03', '2024-01-30']
+ expected = [objs[2]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ # check that inverted interval displays nothing
+ date_range = [dates[-1], dates[0]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check that "today" is interpreted, and single-element interval
+ today_date = datetime.now().strftime(DATE_FORMAT)
+ kwargs_with_date['date'] = today_date
+ obj_today = self.checked_class(**kwargs_with_date)
+ obj_today.save(self.db_conn)
+ date_range = ['today', 'today']
+ items, start, end = f(self.db_conn, date_range, date_col)
+ self.assertEqual(start, today_date)
+ self.assertEqual(start, end)
+ self.assertEqual(items, [obj_today])
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_saving_versioned_attributes(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
"""Test storage and initialization of versioned attributes."""
- def retrieve_attr_vals() -> list[object]:
+
+ def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]:
attr_vals_saved: list[object] = []
- assert hasattr(retrieved, 'id_')
for row in self.db_conn.row_where(attr.table_name, 'parent',
- retrieved.id_):
+ owner.id_):
attr_vals_saved += [row[2]]
return attr_vals_saved
- for attr_name, type_ in self.test_versioneds.items():
- # fail saving attributes on non-saved owner
- owner = self.checked_class(None, **self.default_init_kwargs)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- with self.assertRaises(NotFoundException):
- attr.save(self.db_conn)
- owner.save(self.db_conn)
- # check stored attribute is as expected
- retrieved = self._load_from_db(owner.id_)[0]
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
- # check owner.save() created entries in attr table
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check setting new val to attr inconsequential to DB without save
- attr.set(vals[0])
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check save finally adds new val
- attr.save(self.db_conn)
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals + [vals[0]], attr_vals_saved)
- @_within_checked_class
+ attr.set(to_set[0])
+ # check that without attr.save() no rows in DB
+ rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_)
+ self.assertEqual([], rows)
+ # fail saving attributes on non-saved owner
+ with self.assertRaises(NotFoundException):
+ attr.save(self.db_conn)
+ # check owner.save() created entries as expected in attr table
+ owner.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check changing attr val without save affects owner in memory …
+ attr.set(to_set[1])
+ cmp_attr = getattr(owner, attr_name)
+ self.assertEqual(to_set, list(cmp_attr.history.values()))
+ self.assertEqual(cmp_attr.history, attr.history)
+ # … but does not yet affect DB
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check individual attr.save also stores new val to DB
+ attr.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual(to_set, attr_vals_saved)
+
+ @TestCaseAugmented._run_if_checked_class
def test_saving_and_caching(self) -> None:
"""Test effects of .cache() and .save()."""
id1 = self.default_ids[0]
# check failure to cache without ID (if None-ID input possible)
if isinstance(id1, int):
- obj0 = self.checked_class(None, **self.default_init_kwargs)
+ obj0 = self._make_from_defaults(None)
with self.assertRaises(HandledException):
obj0.cache()
# check mere object init itself doesn't even store in cache
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
self.assertEqual(self.checked_class.get_cache(), {})
# check .cache() fills cache, but not DB
obj1.cache()
self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
- db_found = self._load_from_db(id1)
- self.assertEqual(db_found, [])
+ found_in_db = self._load_from_db(id1)
+ self.assertEqual(found_in_db, [])
# check .save() sets ID (for int IDs), updates cache, and fills DB
# (expect ID to be set to id1, despite obj1 already having that as ID:
# it's generated by cursor.lastrowid on the DB table, and with obj1
# not written there, obj2 should get it first!)
id_input = None if isinstance(id1, int) else id1
- obj2 = self.checked_class(id_input, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id_input)
obj2.save(self.db_conn)
- obj2_hash = hash(obj2)
self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
- db_found += self._load_from_db(id1)
- self.assertEqual([hash(o) for o in db_found], [obj2_hash])
+ # NB: we'll only compare hashes because obj2 itself disappears on
+ # .from_table_row-trioggered database reload
+ obj2_hash = hash(obj2)
+ found_in_db += self._load_from_db(id1)
+ self.assertEqual([hash(o) for o in found_in_db], [obj2_hash])
# check we cannot overwrite obj2 with obj1 despite its same ID,
# since it has disappeared now
with self.assertRaises(HandledException):
obj1.save(self.db_conn)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id(self) -> None:
"""Test .by_id()."""
id1, id2, _ = self.default_ids
# check failure if not yet saved
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
with self.assertRaises(NotFoundException):
self.checked_class.by_id(self.db_conn, id1)
# check identity of cached and retrieved
obj1.cache()
self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
# check identity of saved and retrieved
- obj2 = self.checked_class(id2, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id2)
obj2.save(self.db_conn)
self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id_or_create(self) -> None:
"""Test .by_id_or_create."""
# check .by_id_or_create fails if wrong class
self.checked_class.by_id(self.db_conn, item.id_)
self.assertEqual(self.checked_class(item.id_), item)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class directly from DB."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id_)
obj.save(self.db_conn)
assert isinstance(obj.id_, type(id_))
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', obj.id_):
# check .from_table_row reproduces state saved, no matter if obj
# later changed (with caching even)
+ # NB: we'll only compare hashes because obj itself disappears on
+ # .from_table_row-triggered database reload
hash_original = hash(obj)
attr_name = self._change_obj(obj)
obj.cache()
# check cache contains what .from_table_row just produced
self.assertEqual({retrieved.id_: retrieved},
self.checked_class.get_cache())
- # check .from_table_row also reads versioned attributes from DB
- for attr_name, type_ in self.test_versioneds.items():
- owner = self.checked_class(None)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- owner.save(self.db_conn)
- for row in self.db_conn.row_where(owner.table_name, 'id',
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_history_from_row(self,
+ owner: Any,
+ _: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """"Test VersionedAttribute.history_from_row() knows its DB rows."""
+ attr.set(to_set[0])
+ attr.set(to_set[1])
+ owner.save(self.db_conn)
+ # make empty VersionedAttribute, fill from rows, compare to owner's
+ for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
+ loaded_attr = VersionedAttribute(owner, attr.table_name, default)
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
owner.id_):
- retrieved = owner.__class__.from_table_row(self.db_conn, row)
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
+ loaded_attr.history_from_row(row)
+ self.assertEqual(len(attr.history.keys()),
+ len(loaded_attr.history.keys()))
+ for timestamp, value in attr.history.items():
+ self.assertEqual(value, loaded_attr.history[timestamp])
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_all(self) -> None:
"""Test .all() and its relation to cache and savings."""
- id_1, id_2, id_3 = self.default_ids
- item1 = self.checked_class(id_1, **self.default_init_kwargs)
- item2 = self.checked_class(id_2, **self.default_init_kwargs)
- item3 = self.checked_class(id_3, **self.default_init_kwargs)
+ id1, id2, id3 = self.default_ids
+ item1 = self._make_from_defaults(id1)
+ item2 = self._make_from_defaults(id2)
+ item3 = self._make_from_defaults(id3)
# check .all() returns empty list on un-cached items
self.assertEqual(self.checked_class.all(self.db_conn), [])
# check that all() shows only cached/saved items
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item2, item3]))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
id1 = self.default_ids[0]
- obj = self.checked_class(id1, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id1)
obj.save(self.db_conn)
# change object, expect retrieved through .by_id to carry change
attr_name = self._change_obj(obj)
retrieved = self.checked_class.by_id(self.db_conn, id1)
self.assertEqual(new_attr, getattr(retrieved, attr_name))
- @_within_checked_class
- def test_versioned_singularity_title(self) -> None:
- """Test singularity of VersionedAttributes on saving (with .title)."""
- if 'title' in self.test_versioneds:
- obj = self.checked_class(None)
- obj.save(self.db_conn)
- assert isinstance(obj.id_, int)
- # change obj, expect retrieved through .by_id to carry change
- obj.title.set('named')
- retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
- self.assertEqual(obj.title.history, retrieved.title.history)
-
- @_within_checked_class
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_singularity(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str] | list[float]
+ ) -> None:
+ """Test singularity of VersionedAttributes on saving."""
+ owner.save(self.db_conn)
+ # change obj, expect retrieved through .by_id to carry change
+ attr.set(to_set[0])
+ retrieved = self.checked_class.by_id(self.db_conn, owner.id_)
+ attr_retrieved = getattr(retrieved, attr_name)
+ self.assertEqual(attr.history, attr_retrieved.history)
+
+ @TestCaseAugmented._run_if_checked_class
def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id_)
# check removal only works after saving
with self.assertRaises(HandledException):
obj.remove(self.db_conn)
self.server_thread.start()
self.conn = HTTPConnection(str(self.httpd.server_address[0]),
self.httpd.server_address[1])
- self.httpd.set_json_mode()
+ self.httpd.render_mode = 'json'
def tearDown(self) -> None:
self.httpd.shutdown()
@staticmethod
def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
"""Return list of only 'id' fields of items."""
+ # NB: To tighten the mypy test, consider to, instead of returning
+ # list[str | int], returnlist[int] | list[str]. But since so far to me
+ # the only way to make that work seems to be to repaclement of the
+ # currently active last line with complexity of the out-commented code
+ # block beneath, I currently opt for the status quo.
id_list = []
for item in items:
assert isinstance(item['id'], (int, str))
id_list += [item['id']]
return id_list
+ # if id_list:
+ # if isinstance(id_list[0], int):
+ # for id_ in id_list:
+ # assert isinstance(id_, int)
+ # l_int: list[int] = [id_ for id_ in id_list
+ # if isinstance(id_, int)]
+ # return l_int
+ # for id_ in id_list:
+ # assert isinstance(id_, str)
+ # l_str: list[str] = [id_ for id_ in id_list
+ # if isinstance(id_, str)]
+ # return l_str
+ # return []
@staticmethod
def as_refs(items: list[dict[str, object]]
d['_versioned']['description'][i] = description
return d
+ @staticmethod
+ def procstep_as_dict(id_: int,
+ owner_id: int,
+ step_process_id: int,
+ parent_step_id: int | None = None
+ ) -> dict[str, object]:
+ """Return JSON of Process to expect."""
+ return {'id': id_,
+ 'owner_id': owner_id,
+ 'step_process_id': step_process_id,
+ 'parent_step_id': parent_step_id}
+
+ @staticmethod
+ def todo_as_dict(id_: int = 1,
+ process_id: int = 1,
+ date: str = '2024-01-01',
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None,
+ calendarize: bool = False,
+ comment: str = '',
+ is_done: bool = False,
+ effort: float | None = None,
+ children: list[int] | None = None,
+ parents: list[int] | None = None,
+ ) -> dict[str, object]:
+ """Return JSON of Todo to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'date': date,
+ 'process_id': process_id,
+ 'is_done': is_done,
+ 'calendarize': calendarize,
+ 'comment': comment,
+ 'children': children if children else [],
+ 'parents': parents if parents else [],
+ 'effort': effort,
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'blockers': blockers if blockers else [],
+ 'enables': enables if enables else []}
+ return d
+
@staticmethod
def proc_as_dict(id_: int = 1,
title: str = 'A',
conditions: None | list[int] = None,
disables: None | list[int] = None,
blockers: None | list[int] = None,
- enables: None | list[int] = None
+ enables: None | list[int] = None,
+ explicit_steps: None | list[int] = None
) -> dict[str, object]:
"""Return JSON of Process to expect."""
# pylint: disable=too-many-arguments
d = {'id': id_,
'calendarize': False,
'suppressed_steps': [],
- 'explicit_steps': [],
+ 'explicit_steps': explicit_steps if explicit_steps else [],
'_versioned': {
'title': {0: title},
'description': {0: description},
self.assertEqual(self.conn.getresponse().status, expected_code)
def check_post(self, data: Mapping[str, object], target: str,
- expected_code: int, redirect_location: str = '') -> None:
+ expected_code: int = 302, redir: str = '') -> None:
"""Check that POST of data to target yields expected_code."""
encoded_form_data = urlencode(data, doseq=True).encode('utf-8')
headers = {'Content-Type': 'application/x-www-form-urlencoded',
self.conn.request('POST', target,
body=encoded_form_data, headers=headers)
if 302 == expected_code:
- if redirect_location == '':
- redirect_location = target
- self.check_redirect(redirect_location)
+ redir = target if redir == '' else redir
+ self.check_redirect(redir)
else:
self.assertEqual(self.conn.getresponse().status, expected_code)
"""POST basic Process."""
if not form_data:
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, f'/process?id={id_}', 302,
- f'/process?id={id_}')
+ self.check_post(form_data, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
return form_data
def check_json_get(self, path: str, expected: dict[str, object]) -> None:
timestamp keys of VersionedAttribute history keys into integers
counting chronologically forward from 0.
"""
+
def rewrite_history_keys_in(item: Any) -> Any:
if isinstance(item, dict):
if '_versioned' in item.keys():
elif isinstance(item, list):
item[:] = [rewrite_history_keys_in(i) for i in item]
return item
+
self.conn.request('GET', path)
response = self.conn.getresponse()
self.assertEqual(response.status, 200)
retrieved = json_loads(response.read().decode())
rewrite_history_keys_in(retrieved)
+ # import pprint
+ # pprint.pprint(expected)
+ # pprint.pprint(retrieved)
self.assertEqual(expected, retrieved)
+++ /dev/null
-""""Test Versioned Attributes in the abstract."""
-from unittest import TestCase
-from time import sleep
-from datetime import datetime
-from tests.utils import TestCaseWithDB
-from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
-from plomtask.db import BaseModel
-
-SQL_TEST_TABLE_STR = '''
-CREATE TABLE versioned_tests (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- value TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp)
-);
-'''
-SQL_TEST_TABLE_FLOAT = '''
-CREATE TABLE versioned_tests (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- value REAL NOT NULL,
- PRIMARY KEY (parent, timestamp)
-);
-'''
-
-
-class TestParentType(BaseModel[int]):
- """Dummy abstracting whatever may use VersionedAttributes."""
-
-
-class TestsSansDB(TestCase):
- """Tests not requiring DB setup."""
-
- def test_VersionedAttribute_set(self) -> None:
- """Test .set() behaves as expected."""
- # check value gets set even if already is the default
- attr = VersionedAttribute(None, '', 'A')
- attr.set('A')
- self.assertEqual(list(attr.history.values()), ['A'])
- # check same value does not get set twice in a row,
- # and that not even its timestamp get updated
- timestamp = list(attr.history.keys())[0]
- attr.set('A')
- self.assertEqual(list(attr.history.values()), ['A'])
- self.assertEqual(list(attr.history.keys())[0], timestamp)
- # check that different value _will_ be set/added
- attr.set('B')
- self.assertEqual(sorted(attr.history.values()), ['A', 'B'])
- # check that a previously used value can be set if not most recent
- attr.set('A')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B'])
- # again check for same value not being set twice in a row, even for
- # later items
- attr.set('D')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D'])
- attr.set('D')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D'])
-
- def test_VersionedAttribute_newest(self) -> None:
- """Test .newest returns newest element, or default on empty."""
- attr = VersionedAttribute(None, '', 'A')
- self.assertEqual(attr.newest, 'A')
- attr.set('B')
- self.assertEqual(attr.newest, 'B')
- attr.set('C')
-
- def test_VersionedAttribute_at(self) -> None:
- """Test .at() returns values nearest to queried time, or default."""
- # check .at() return default on empty history
- attr = VersionedAttribute(None, '', 'A')
- timestamp_a = datetime.now().strftime(TIMESTAMP_FMT)
- self.assertEqual(attr.at(timestamp_a), 'A')
- # check value exactly at timestamp returned
- attr.set('B')
- timestamp_b = list(attr.history.keys())[0]
- self.assertEqual(attr.at(timestamp_b), 'B')
- # check earliest value returned if exists, rather than default
- self.assertEqual(attr.at(timestamp_a), 'B')
- # check reverts to previous value for timestamps not indexed
- sleep(0.00001)
- timestamp_between = datetime.now().strftime(TIMESTAMP_FMT)
- sleep(0.00001)
- attr.set('C')
- timestamp_c = sorted(attr.history.keys())[-1]
- self.assertEqual(attr.at(timestamp_c), 'C')
- self.assertEqual(attr.at(timestamp_between), 'B')
- sleep(0.00001)
- timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT)
- self.assertEqual(attr.at(timestamp_after_c), 'C')
-
-
-class TestsWithDBStr(TestCaseWithDB):
- """Module tests requiring DB setup."""
- default_vals: list[str | float] = ['A', 'B', 'C']
- init_sql = SQL_TEST_TABLE_STR
-
- def setUp(self) -> None:
- super().setUp()
- self.db_conn.exec(self.init_sql)
- self.test_parent = TestParentType(1)
- self.attr = VersionedAttribute(self.test_parent,
- 'versioned_tests', self.default_vals[0])
-
- def test_VersionedAttribute_save(self) -> None:
- """Test .save() to write to DB."""
- # check mere .set() calls do not by themselves reflect in the DB
- self.attr.set(self.default_vals[1])
- self.assertEqual([],
- self.db_conn.row_where('versioned_tests',
- 'parent', 1))
- # check .save() makes history appear in DB
- self.attr.save(self.db_conn)
- vals_found = []
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- vals_found += [row[2]]
- self.assertEqual([self.default_vals[1]], vals_found)
- # check .save() also updates history in DB
- self.attr.set(self.default_vals[2])
- self.attr.save(self.db_conn)
- vals_found = []
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- vals_found += [row[2]]
- self.assertEqual([self.default_vals[1], self.default_vals[2]],
- sorted(vals_found))
-
- def test_VersionedAttribute_history_from_row(self) -> None:
- """"Test .history_from_row() properly interprets DB rows."""
- self.attr.set(self.default_vals[1])
- self.attr.set(self.default_vals[2])
- self.attr.save(self.db_conn)
- loaded_attr = VersionedAttribute(self.test_parent, 'versioned_tests',
- self.default_vals[0])
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- loaded_attr.history_from_row(row)
- for timestamp, value in self.attr.history.items():
- self.assertEqual(value, loaded_attr.history[timestamp])
- self.assertEqual(len(self.attr.history.keys()),
- len(loaded_attr.history.keys()))
-
-
-class TestsWithDBFloat(TestsWithDBStr):
- """Module tests requiring DB setup."""
- default_vals: list[str | float] = [0.9, 1.1, 2]
- init_sql = SQL_TEST_TABLE_FLOAT