class Condition(BaseModel[int]):
"""Non-Process dependency for ProcessSteps and Todos."""
table_name = 'conditions'
- to_save = ['is_active']
- to_save_versioned = ['title', 'description']
+ to_save_simples = ['is_active']
+ versioned_defaults = {'title': 'UNNAMED', 'description': ''}
to_search = ['title.newest', 'description.newest']
can_create_by_id = True
sorters = {'is_active': lambda c: c.is_active,
def __init__(self, id_: int | None, is_active: bool = False) -> None:
super().__init__(id_)
self.is_active = is_active
- self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
- self.description = VersionedAttribute(self, 'condition_descriptions',
- '')
+ for name in ['title', 'description']:
+ attr = VersionedAttribute(self, f'condition_{name}s',
+ self.versioned_defaults[name])
+ setattr(self, name, attr)
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, with VersionedAttributes.
class Day(BaseModel[str]):
"""Individual days defined by their dates."""
table_name = 'days'
- to_save = ['comment']
+ to_save_simples = ['comment']
add_to_dict = ['todos']
can_create_by_id = True
day.todos = Todo.by_date(db_conn, day.id_)
return day
- @classmethod
- def by_date_range_filled(cls, db_conn: DatabaseConnection,
- start: str, end: str) -> list[Day]:
- """Return days existing and non-existing between dates start/end."""
- ret = cls.by_date_range_with_limits(db_conn, (start, end), 'id')
- days, start_date, end_date = ret
- return cls.with_filled_gaps(days, start_date, end_date)
-
@classmethod
def with_filled_gaps(cls, days: list[Day], start_date: str, end_date: str
) -> list[Day]:
- """In days, fill with (un-saved) Days gaps between start/end_date."""
+ """In days, fill with (un-stored) Days gaps between start/end_date."""
+ days = days[:]
+ start_date, end_date = valid_date(start_date), valid_date(end_date)
if start_date > end_date:
- return days
+ return []
+ days = [d for d in days if d.date >= start_date and d.date <= end_date]
days.sort()
if start_date not in [d.date for d in days]:
days[:] = [Day(start_date)] + days
class BaseModel(Generic[BaseModelId]):
"""Template for most of the models we use/derive from the DB."""
table_name = ''
- to_save: list[str] = []
- to_save_versioned: list[str] = []
+ to_save_simples: list[str] = []
to_save_relations: list[tuple[str, str, str, int]] = []
+ versioned_defaults: dict[str, str | float] = {}
add_to_dict: list[str] = []
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
self.id_ = id_
def __hash__(self) -> int:
- hashable = [self.id_] + [getattr(self, name) for name in self.to_save]
+ hashable = [self.id_] + [getattr(self, name)
+ for name in self.to_save_simples]
for definition in self.to_save_relations:
attr = getattr(self, definition[2])
hashable += [tuple(rel.id_ for rel in attr)]
- for name in self.to_save_versioned:
+ for name in self.to_save_versioned():
hashable += [hash(getattr(self, name))]
return hash(tuple(hashable))
assert isinstance(other.id_, int)
return self.id_ < other.id_
+ @classmethod
+ def to_save_versioned(cls) -> list[str]:
+ """Return keys of cls.versioned_defaults assuming we wanna save 'em."""
+ return list(cls.versioned_defaults.keys())
+
@property
def as_dict(self) -> dict[str, object]:
"""Return self as (json.dumps-compatible) dict."""
library: dict[str, dict[str | int, object]] = {}
d: dict[str, object] = {'id': self.id_, '_library': library}
- for to_save in self.to_save:
+ for to_save in self.to_save_simples:
attr = getattr(self, to_save)
if hasattr(attr, 'as_dict_into_reference'):
d[to_save] = attr.as_dict_into_reference(library)
else:
d[to_save] = attr
- if len(self.to_save_versioned) > 0:
+ if len(self.to_save_versioned()) > 0:
d['_versioned'] = {}
- for k in self.to_save_versioned:
+ for k in self.to_save_versioned():
attr = getattr(self, k)
assert isinstance(d['_versioned'], dict)
d['_versioned'][k] = attr.history
@classmethod
def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title'
) -> str:
- """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed)."""
+ """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed).
+
+ Before cls.sorters[sort_key] is applied, seq is sorted by .id_, to
+ ensure predictability where parts of seq are of same sort value.
+ """
reverse = False
if len(sort_key) > 1 and '-' == sort_key[0]:
sort_key = sort_key[1:]
reverse = True
if sort_key not in cls.sorters:
sort_key = default
+ seq.sort(key=lambda x: x.id_, reverse=reverse)
sorter: Callable[..., Any] = cls.sorters[sort_key]
seq.sort(key=sorter, reverse=reverse)
if reverse:
"""Make from DB row (sans relations), update DB cache with it."""
obj = cls(*row)
assert obj.id_ is not None
- for attr_name in cls.to_save_versioned:
+ for attr_name in cls.to_save_versioned():
attr = getattr(obj, attr_name)
table_name = attr.table_name
for row_ in db_conn.row_where(table_name, 'parent', obj.id_):
date_col: str = 'day'
) -> tuple[list[BaseModelInstance], str,
str]:
- """Return list of items in database within (open) date_range interval.
+ """Return list of items in DB within (closed) date_range interval.
If no range values provided, defaults them to 'yesterday' and
'tomorrow'. Knows to properly interpret these and 'today' as value.
"""Write self to DB and cache and ensure .id_.
Write both to DB, and to cache. To DB, write .id_ and attributes
- listed in cls.to_save[_versioned|_relations].
+ listed in cls.to_save_[simples|versioned|_relations].
Ensure self.id_ by setting it to what the DB command returns as the
last saved row's ID (cursor.lastrowid), EXCEPT if self.id_ already
only the case with the Day class, where it's to be a date string.
"""
values = tuple([self.id_] + [getattr(self, key)
- for key in self.to_save])
+ for key in self.to_save_simples])
table_name = self.table_name
cursor = db_conn.exec_on_vals(f'REPLACE INTO {table_name} VALUES',
values)
if not isinstance(self.id_, str):
self.id_ = cursor.lastrowid # type: ignore[assignment]
self.cache()
- for attr_name in self.to_save_versioned:
+ for attr_name in self.to_save_versioned():
getattr(self, attr_name).save(db_conn)
for table, column, attr_name, key_index in self.to_save_relations:
assert isinstance(self.id_, (int, str))
"""Remove from DB and cache, including dependencies."""
if self.id_ is None or self._get_cached(self.id_) is None:
raise HandledException('cannot remove unsaved item')
- for attr_name in self.to_save_versioned:
+ for attr_name in self.to_save_versioned():
getattr(self, attr_name).remove(db_conn)
for table, column, attr_name, _ in self.to_save_relations:
db_conn.delete_where(table, column, self.id_)
same, the only difference being the HTML template they are rendered to,
which .do_GET selects from their method name.
"""
- start = self._params.get_str('start')
- end = self._params.get_str('end')
- if not end:
- end = date_in_n_days(366)
- ret = Day.by_date_range_with_limits(self.conn, (start, end), 'id')
- days, start, end = ret
+ start, end = self._params.get_str('start'), self._params.get_str('end')
+ end = end if end else date_in_n_days(366)
+ days, start, end = Day.by_date_range_with_limits(self.conn,
+ (start, end), 'id')
days = Day.with_filled_gaps(days, start, end)
today = date_in_n_days(0)
return {'start': start, 'end': end, 'days': days, 'today': today}
"""Template for, and metadata for, Todos, and their arrangements."""
# pylint: disable=too-many-instance-attributes
table_name = 'processes'
- to_save = ['calendarize']
- to_save_versioned = ['title', 'description', 'effort']
+ to_save_simples = ['calendarize']
to_save_relations = [('process_conditions', 'process', 'conditions', 0),
('process_blockers', 'process', 'blockers', 0),
('process_enables', 'process', 'enables', 0),
('process_step_suppressions', 'process',
'suppressed_steps', 0)]
add_to_dict = ['explicit_steps']
+ versioned_defaults = {'title': 'UNNAMED', 'description': '', 'effort': 1.0}
to_search = ['title.newest', 'description.newest']
can_create_by_id = True
sorters = {'steps': lambda p: len(p.explicit_steps),
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
BaseModel.__init__(self, id_)
ConditionsRelations.__init__(self)
- self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
- self.description = VersionedAttribute(self, 'process_descriptions', '')
- self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
+ for name in ['title', 'description', 'effort']:
+ attr = VersionedAttribute(self, f'process_{name}s',
+ self.versioned_defaults[name])
+ setattr(self, name, attr)
self.explicit_steps: list[ProcessStep] = []
self.suppressed_steps: list[ProcessStep] = []
self.calendarize = calendarize
class ProcessStep(BaseModel[int]):
"""Sub-unit of Processes."""
table_name = 'process_steps'
- to_save = ['owner_id', 'step_process_id', 'parent_step_id']
+ to_save_simples = ['owner_id', 'step_process_id', 'parent_step_id']
def __init__(self, id_: int | None, owner_id: int, step_process_id: int,
parent_step_id: int | None) -> None:
# pylint: disable=too-many-instance-attributes
# pylint: disable=too-many-public-methods
table_name = 'todos'
- to_save = ['process_id', 'is_done', 'date', 'comment', 'effort',
- 'calendarize']
+ to_save_simples = ['process_id', 'is_done', 'date', 'comment', 'effort',
+ 'calendarize']
to_save_relations = [('todo_conditions', 'todo', 'conditions', 0),
('todo_blockers', 'todo', 'blockers', 0),
('todo_enables', 'todo', 'enables', 0),
@property
def title(self) -> VersionedAttribute:
"""Shortcut to .process.title."""
+ assert isinstance(self.process.title, VersionedAttribute)
return self.process.title
@property
parent: Any, table_name: str, default: str | float) -> None:
self.parent = parent
self.table_name = table_name
- self.default = default
+ self._default = default
self.history: dict[str, str | float] = {}
def __hash__(self) -> int:
history_tuples = tuple((k, v) for k, v in self.history.items())
- hashable = (self.parent.id_, self.table_name, self.default,
+ hashable = (self.parent.id_, self.table_name, self._default,
history_tuples)
return hash(hashable)
"""Return most recent timestamp."""
return sorted(self.history.keys())[-1]
+ @property
+ def value_type_name(self) -> str:
+ """Return string of name of attribute value type."""
+ return type(self._default).__name__
+
@property
def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
+ """Return most recent value, or self._default if self.history empty."""
if 0 == len(self.history):
- return self.default
+ return self._default
return self.history[self._newest_timestamp]
def reset_timestamp(self, old_str: str, new_str: str) -> None:
queried_time += ' 23:59:59.999'
sorted_timestamps = sorted(self.history.keys())
if 0 == len(sorted_timestamps):
- return self.default
+ return self._default
selected_timestamp = sorted_timestamps[0]
for timestamp in sorted_timestamps[1:]:
if timestamp > queried_time:
--- /dev/null
+{% extends '_base.html' %}
+
+{% block content %}
+<h3>calendar</h3>
+
+<p><a href="/calendar">normal view</a></p>
+
+<form action="calendar_txt" method="GET">
+from <input name="start" class="date" value="{{start}}" />
+to <input name="end" class="date" value="{{end}}" />
+<input type="submit" value="OK" />
+</form>
+<table>
+
+<pre>{% for day in days %}{% if day.weekday == "Monday" %}
+---{% endif %}{% if day.comment or day.calendarized_todos %}
+{{day.weekday|truncate(2,True,'',0)}} {{day.date}} {{day.comment|e}}{% endif %}{% if day.calendarized_todos%}{% for todo in day.calendarized_todos %}
+* {{todo.title_then|e}}{% if todo.comment %} / {{todo.comment|e}}{% endif %}{% endfor %}{% endif %}{% endfor %}
+</pre>
+{% endblock %}
class TestsSansDB(TestCaseSansDB):
"""Tests requiring no DB setup."""
checked_class = Condition
- versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''}
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Condition
default_init_kwargs = {'is_active': False}
- test_versioneds = {'title': str, 'description': str}
def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
proc = Process(None)
proc.save(self.db_conn)
todo = Todo(None, proc, False, '2024-01-01')
+ todo.save(self.db_conn)
+ # check condition can only be deleted if not depended upon
for depender in (proc, todo):
assert hasattr(depender, 'save')
assert hasattr(depender, 'set_conditions')
c = Condition(None)
c.save(self.db_conn)
- depender.save(self.db_conn)
- depender.set_conditions(self.db_conn, [c.id_], 'conditions')
+ depender.set_conditions(self.db_conn, [c.id_])
depender.save(self.db_conn)
with self.assertRaises(HandledException):
c.remove(self.db_conn)
- depender.set_conditions(self.db_conn, [], 'conditions')
+ depender.set_conditions(self.db_conn, [])
depender.save(self.db_conn)
c.remove(self.db_conn)
def test_fail_POST_condition(self) -> None:
"""Test malformed/illegal POST /condition requests."""
- # check invalid POST payloads
+ # check incomplete POST payloads
url = '/condition'
self.check_post({}, url, 400)
self.check_post({'title': ''}, url, 400)
valid_payload = {'title': '', 'description': '', 'is_active': False}
self.check_post(valid_payload, '/condition?id=foo', 400)
- def test_do_POST_condition(self) -> None:
+ def test_POST_condition(self) -> None:
"""Test (valid) POST /condition and its effect on GET /condition[s]."""
# test valid POST's effect on …
post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(post, '/condition', 302, '/condition?id=1')
+ self.check_post(post, '/condition', redir='/condition?id=1')
# … single /condition
- cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(cond['_versioned'], dict)
- expected_single = self.GET_condition_dict(cond)
+ expected_cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(expected_cond['_versioned'], dict)
+ expected_single = self.GET_condition_dict(expected_cond)
self.check_json_get('/condition?id=1', expected_single)
# … full /conditions
- expected_all = self.GET_conditions_dict([cond])
+ expected_all = self.GET_conditions_dict([expected_cond])
self.check_json_get('/conditions', expected_all)
# test (no) effect of invalid POST to existing Condition on /condition
self.check_post({}, '/condition?id=1', 400)
self.check_json_get('/condition?id=1', expected_single)
# test effect of POST changing title and activeness
post = {'title': 'bar', 'description': 'oof', 'is_active': True}
- self.check_post(post, '/condition?id=1', 302)
- cond['_versioned']['title'][1] = 'bar'
- cond['is_active'] = True
+ self.check_post(post, '/condition?id=1')
+ expected_cond['_versioned']['title'][1] = 'bar'
+ expected_cond['is_active'] = True
self.check_json_get('/condition?id=1', expected_single)
- # test deletion POST's effect on …
- self.check_post({'delete': ''}, '/condition?id=1', 302, '/conditions')
- cond = self.cond_as_dict()
+ # test deletion POST's effect, both to return id=1 into empty single, …
+ self.check_post({'delete': ''}, '/condition?id=1', redir='/conditions')
+ expected_cond = self.cond_as_dict()
assert isinstance(expected_single['_library'], dict)
- expected_single['_library']['Condition'] = self.as_refs([cond])
+ expected_single['_library']['Condition'] = self.as_refs(
+ [expected_cond])
self.check_json_get('/condition?id=1', expected_single)
- # … full /conditions
+ # … and full /conditions into empty list
expected_all['conditions'] = []
expected_all['_library'] = {}
self.check_json_get('/conditions', expected_all)
- def test_do_GET_condition(self) -> None:
+ def test_GET_condition(self) -> None:
"""More GET /condition testing, especially for Process relations."""
# check expected default status codes
self.check_get_defaults('/condition')
# make Condition and two Processes that among them establish all
# possible ConditionsRelations to it, …
cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
- self.check_post(cond_post, '/condition', 302, '/condition?id=1')
+ self.check_post(cond_post, '/condition', redir='/condition?id=1')
proc1_post = {'title': 'A', 'description': '', 'effort': 1.0,
'conditions': [1], 'disables': [1]}
proc2_post = {'title': 'B', 'description': '', 'effort': 1.0,
self.post_process(1, proc1_post)
self.post_process(2, proc2_post)
# … then check /condition displays all these properly.
- cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
- assert isinstance(cond['id'], int)
- proc1 = self.proc_as_dict(conditions=[cond['id']],
- disables=[cond['id']])
+ cond_expected = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(cond_expected['id'], int)
+ proc1 = self.proc_as_dict(conditions=[cond_expected['id']],
+ disables=[cond_expected['id']])
proc2 = self.proc_as_dict(2, 'B',
- blockers=[cond['id']],
- enables=[cond['id']])
- expected = self.GET_condition_dict(cond)
- assert isinstance(expected['_library'], dict)
- expected['enabled_processes'] = self.as_id_list([proc1])
- expected['disabled_processes'] = self.as_id_list([proc2])
- expected['enabling_processes'] = self.as_id_list([proc2])
- expected['disabling_processes'] = self.as_id_list([proc1])
- expected['_library']['Process'] = self.as_refs([proc1, proc2])
- self.check_json_get('/condition?id=1', expected)
+ blockers=[cond_expected['id']],
+ enables=[cond_expected['id']])
+ display_expected = self.GET_condition_dict(cond_expected)
+ assert isinstance(display_expected['_library'], dict)
+ display_expected['enabled_processes'] = self.as_id_list([proc1])
+ display_expected['disabled_processes'] = self.as_id_list([proc2])
+ display_expected['enabling_processes'] = self.as_id_list([proc2])
+ display_expected['disabling_processes'] = self.as_id_list([proc1])
+ display_expected['_library']['Process'] = self.as_refs([proc1, proc2])
+ self.check_json_get('/condition?id=1', display_expected)
- def test_do_GET_conditions(self) -> None:
+ def test_GET_conditions(self) -> None:
"""Test GET /conditions."""
# test empty result on empty DB, default-settings on empty params
expected = self.GET_conditions_dict([])
self.check_json_get('/conditions', expected)
- # test on meaningless non-empty params (incl. entirely un-used key),
+ # test ignorance of meaningless non-empty params (incl. unknown key),
# that 'sort_by' default to 'title' (even if set to something else, as
# long as without handler) and 'pattern' get preserved
expected['pattern'] = 'bar' # preserved despite zero effect!
+ expected['sort_by'] = 'title' # for clarity (actually already set)
url = '/conditions?sort_by=foo&pattern=bar&foo=x'
self.check_json_get(url, expected)
# test non-empty result, automatic (positive) sorting by title
- post1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
- post2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
- post3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
- self.check_post(post1, '/condition', 302, '/condition?id=1')
- self.check_post(post2, '/condition', 302, '/condition?id=2')
- self.check_post(post3, '/condition', 302, '/condition?id=3')
+ post_cond1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
+ post_cond2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
+ post_cond3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
+ self.check_post(post_cond1, '/condition', redir='/condition?id=1')
+ self.check_post(post_cond2, '/condition', redir='/condition?id=2')
+ self.check_post(post_cond3, '/condition', redir='/condition?id=3')
cond1 = self.cond_as_dict(1, False, ['foo'], ['oof'])
cond2 = self.cond_as_dict(2, False, ['bar'], ['rab'])
cond3 = self.cond_as_dict(3, True, ['baz'], ['zab'])
expected = self.GET_conditions_dict([cond2, cond3, cond1])
self.check_json_get('/conditions', expected)
# test other sortings
- # (NB: by .is_active has two items of =False, their order currently
- # is not explicitly made predictable, so mail fail until we do)
- expected['conditions'] = self.as_id_list([cond1, cond3, cond2])
expected['sort_by'] = '-title'
+ assert isinstance(expected['conditions'], list)
+ expected['conditions'].reverse()
self.check_json_get('/conditions?sort_by=-title', expected)
- expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
expected['sort_by'] = 'is_active'
+ expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
self.check_json_get('/conditions?sort_by=is_active', expected)
- expected['conditions'] = self.as_id_list([cond3, cond1, cond2])
expected['sort_by'] = '-is_active'
+ expected['conditions'].reverse()
self.check_json_get('/conditions?sort_by=-is_active', expected)
# test pattern matching on title
expected = self.GET_conditions_dict([cond2, cond3])
self.check_json_get('/conditions?pattern=ba', expected)
# test pattern matching on description
assert isinstance(expected['_library'], dict)
+ expected['pattern'] = 'of'
expected['conditions'] = self.as_id_list([cond1])
expected['_library']['Condition'] = self.as_refs([cond1])
- expected['pattern'] = 'oo'
- self.check_json_get('/conditions?pattern=oo', expected)
+ self.check_json_get('/conditions?pattern=of', expected)
"""Test Days module."""
-from unittest import TestCase
-from datetime import datetime
+from datetime import datetime, timedelta
from typing import Callable
-from tests.utils import TestCaseWithDB, TestCaseWithServer
-from plomtask.dating import date_in_n_days
+from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
+from plomtask.dating import date_in_n_days, DATE_FORMAT
from plomtask.days import Day
-class TestsSansDB(TestCase):
+class TestsSansDB(TestCaseSansDB):
"""Days module tests not requiring DB setup."""
- legal_ids = ['2024-01-01']
- illegal_ids = ['foo', '2024-02-30', '2024-02-01 23:00:00']
+ checked_class = Day
+ legal_ids = ['2024-01-01', '2024-02-29']
+ illegal_ids = ['foo', '2023-02-29', '2024-02-30', '2024-02-01 23:00:00']
+
+ def test_date_in_n_days(self) -> None:
+ """Test dating.date_in_n_days, as we rely on it in later tests."""
+ for n in [-100, -2, -1, 0, 1, 2, 1000]:
+ date = datetime.now() + timedelta(days=n)
+ self.assertEqual(date_in_n_days(n), date.strftime(DATE_FORMAT))
def test_Day_datetime_weekday_neighbor_dates(self) -> None:
- """Test Day's date parsing."""
+ """Test Day's date parsing and neighbourhood resolution."""
self.assertEqual(datetime(2024, 5, 1), Day('2024-05-01').datetime)
self.assertEqual('Sunday', Day('2024-03-17').weekday)
self.assertEqual('March', Day('2024-03-17').month_name)
self.assertEqual('2023-12-31', Day('2024-01-01').prev_date)
self.assertEqual('2023-03-01', Day('2023-02-28').next_date)
- def test_Day_sorting(self) -> None:
- """Test sorting by .__lt__ and Day.__eq__."""
- day1 = Day('2024-01-01')
- day2 = Day('2024-01-02')
- day3 = Day('2024-01-03')
- days = [day3, day1, day2]
- self.assertEqual(sorted(days), [day1, day2, day3])
-
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Day
default_ids = ('2024-01-01', '2024-01-02', '2024-01-03')
- def test_Day_by_date_range_filled(self) -> None:
- """Test Day.by_date_range_filled."""
- date1, date2, date3 = self.default_ids
- day1 = Day(date1)
- day2 = Day(date2)
- day3 = Day(date3)
- for day in [day1, day2, day3]:
- day.save(self.db_conn)
- # check date range includes limiter days
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
- [day1, day2, day3])
- # check first date range value excludes what's earlier
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date2, date3),
- [day2, day3])
- # check second date range value excludes what's later
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date2),
- [day1, day2])
- # check swapped (impossible) date range returns emptiness
- self.assertEqual(Day.by_date_range_filled(self.db_conn, date3, date1),
- [])
- # check fill_gaps= instantiates unsaved dates within date range
- # (but does not store them)
- day5 = Day('2024-01-05')
- day6 = Day('2024-01-06')
- day6.save(self.db_conn)
- day7 = Day('2024-01-07')
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- day5.date, day7.date),
- [day5, day6, day7])
- self.check_identity_with_cache_and_db([day1, day2, day3, day6])
- # check 'today' is interpreted as today's date
- today = Day(date_in_n_days(0))
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- 'today', 'today'),
- [today])
- prev_day = Day(date_in_n_days(-1))
- next_day = Day(date_in_n_days(1))
- self.assertEqual(Day.by_date_range_filled(self.db_conn,
- 'yesterday', 'tomorrow'),
- [prev_day, today, next_day])
+ def test_Day_by_date_range_with_limits(self) -> None:
+ """Test .by_date_range_with_limits."""
+ self.check_by_date_range_with_limits('id', set_id_field=False)
+
+ def test_Day_with_filled_gaps(self) -> None:
+ """Test .with_filled_gaps."""
+
+ def test(range_indexes: tuple[int, int], indexes_to_provide: list[int]
+ ) -> None:
+ start_i, end_i = range_indexes
+ days_provided = []
+ days_expected = days_sans_comment[:]
+ for i in indexes_to_provide:
+ day_with_comment = days_with_comment[i]
+ days_provided += [day_with_comment]
+ days_expected[i] = day_with_comment
+ days_expected = days_expected[start_i:end_i+1]
+ start, end = dates[start_i], dates[end_i]
+ days_result = self.checked_class.with_filled_gaps(days_provided,
+ start, end)
+ self.assertEqual(days_result, days_expected)
+
+ # for provided Days we use those from days_with_comment, to identify
+ # them against same-dated mere filler Days by their lack of comment
+ # (identity with Day at the respective position in days_sans_comment)
+ dates = [f'2024-02-0{n+1}' for n in range(9)]
+ days_with_comment = [Day(date, comment=date[-1:]) for date in dates]
+ days_sans_comment = [Day(date, comment='') for date in dates]
+ # check provided Days recognizable in (full-range) interval
+ test((0, 8), [0, 4, 8])
+ # check limited range, but limiting Days provided
+ test((2, 6), [2, 5, 6])
+ # check Days within range but beyond provided Days also filled in
+ test((1, 7), [2, 5])
+ # check provided Days beyond range ignored
+ test((3, 5), [1, 2, 4, 6, 7])
+ # check inversion of start_date and end_date returns empty list
+ test((5, 3), [2, 4, 6])
+ # check empty provision still creates filler elements in interval
+ test((3, 5), [])
+ # check single-element selection creating only filler beyond provided
+ test((1, 1), [2, 4, 6])
+ # check (un-saved) filler Days don't show up in cache or DB
+ # dates = [f'2024-02-0{n}' for n in range(1, 6)]
+ day = Day(dates[3])
+ day.save(self.db_conn)
+ self.checked_class.with_filled_gaps([day], dates[0], dates[-1])
+ self.check_identity_with_cache_and_db([day])
+ # check 'today', 'yesterday', 'tomorrow' are interpreted
+ yesterday = Day('yesterday')
+ tomorrow = Day('tomorrow')
+ today = Day('today')
+ result = self.checked_class.with_filled_gaps([today], 'yesterday',
+ 'tomorrow')
+ self.assertEqual(result, [yesterday, today, tomorrow])
class TestsWithServer(TestCaseWithServer):
@classmethod
def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
- """Return JSON of GET /calendar to expect."""
+ """Return JSON of GET /calendar to expect.
+
+ NB: the date string list to key 'days' implies/expects a continuous (=
+ gaps filled) alphabetical order of dates by virtue of range(start,
+ end+1) and date_in_n_days tested in TestsSansDB.test_date_in_n_days.
+ """
today_date = date_in_n_days(0)
start_date = date_in_n_days(start)
end_date = date_in_n_days(end)
# check illegal date range delimiters
self.check_get('/calendar?start=foo', 400)
self.check_get('/calendar?end=foo', 400)
- # check default range without saved days
+ # check default range for expected selection/order without saved days
expected = self.GET_calendar_dict(-1, 366)
self.check_json_get('/calendar', expected)
self.check_json_get('/calendar?start=&end=', expected)
- # check named days as delimiters
+ # check with named days as delimiters
expected = self.GET_calendar_dict(-1, +1)
self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected)
# check zero-element range
expected = self.GET_calendar_dict(+1, 0)
self.check_json_get('/calendar?start=tomorrow&end=today', expected)
- # check saved day shows up in results with proven by its comment
+ # check saved day shows up in results, proven by its comment
post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''}
date1 = date_in_n_days(-2)
self._post_day(f'date={date1}', post_day)
class TestsSansDB(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = Process
- versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '',
- 'effort': 1.0}
class TestsSansDBProcessStep(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = ProcessStep
- default_init_args = [2, 3, 4]
+ default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
+ 'parent_step_id': 4}
class TestsWithDB(TestCaseWithDB):
"""Module tests requiring DB setup."""
checked_class = Process
- test_versioneds = {'title': str, 'description': str, 'effort': float}
def three_processes(self) -> tuple[Process, Process, Process]:
"""Return three saved processes."""
class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
"""Tests requiring DB, but not server setup.
- NB: We subclass TestCaseSansDB too, to pull in its .test_id_validation,
- which for Todo wouldn't run without a DB being set up due to the need for
- Processes with set IDs.
+ NB: We subclass TestCaseSansDB too, to run any tests there that due to any
+ Todo requiring a _saved_ Process wouldn't run without a DB.
"""
checked_class = Todo
default_init_kwargs = {'process': None, 'is_done': False,
'date': '2024-01-01'}
- # solely used for TestCaseSansDB.test_id_setting
- default_init_args = [None, False, '2024-01-01']
def setUp(self) -> None:
super().setUp()
self.cond2 = Condition(None)
self.cond2.save(self.db_conn)
self.default_init_kwargs['process'] = self.proc
- self.default_init_args[0] = self.proc
def test_Todo_init(self) -> None:
"""Test creation of Todo and what they default to."""
with self.assertRaises(BadFormatException):
self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
+ def test_Todo_by_date_range_with_limits(self) -> None:
+ """Test .by_date_range_with_limits."""
+ self.check_by_date_range_with_limits('day')
+
def test_Todo_on_conditions(self) -> None:
"""Test effect of Todos on Conditions."""
assert isinstance(self.cond1.id_, int)
from typing import Mapping, Any, Callable
from threading import Thread
from http.client import HTTPConnection
+from datetime import datetime, timedelta
+from time import sleep
from json import loads as json_loads
from urllib.parse import urlencode
from uuid import uuid4
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
from plomtask.days import Day
+from plomtask.dating import DATE_FORMAT
from plomtask.todos import Todo
+from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
from plomtask.exceptions import NotFoundException, HandledException
-def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]:
- def wrapper(self: TestCase) -> None:
- if hasattr(self, 'checked_class'):
- f(self)
- return wrapper
+VERSIONED_VALS: dict[str,
+ list[str] | list[float]] = {'str': ['A', 'B'],
+ 'float': [0.3, 1.1]}
-class TestCaseSansDB(TestCase):
- """Tests requiring no DB setup."""
+class TestCaseAugmented(TestCase):
+ """Tester core providing helpful basic internal decorators and methods."""
checked_class: Any
- default_init_args: list[Any] = []
- versioned_defaults_to_test: dict[str, str | float] = {}
- legal_ids = [1, 5]
- illegal_ids = [0]
+ default_init_kwargs: dict[str, Any] = {}
+
+ @staticmethod
+ def _run_if_checked_class(f: Callable[..., None]) -> Callable[..., None]:
+ def wrapper(self: TestCase) -> None:
+ if hasattr(self, 'checked_class'):
+ f(self)
+ return wrapper
+
+ @classmethod
+ def _run_on_versioned_attributes(cls,
+ f: Callable[..., None]
+ ) -> Callable[..., None]:
+ @cls._run_if_checked_class
+ def wrapper(self: TestCase) -> None:
+ assert isinstance(self, TestCaseAugmented)
+ for attr_name in self.checked_class.to_save_versioned():
+ default = self.checked_class.versioned_defaults[attr_name]
+ owner = self.checked_class(None, **self.default_init_kwargs)
+ attr = getattr(owner, attr_name)
+ to_set = VERSIONED_VALS[attr.value_type_name]
+ f(self, owner, attr_name, attr, default, to_set)
+ return wrapper
+
+ @classmethod
+ def _make_from_defaults(cls, id_: float | str | None) -> Any:
+ return cls.checked_class(id_, **cls.default_init_kwargs)
+
+
+class TestCaseSansDB(TestCaseAugmented):
+ """Tests requiring no DB setup."""
+ legal_ids: list[str] | list[int] = [1, 5]
+ illegal_ids: list[str] | list[int] = [0]
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_id_validation(self) -> None:
"""Test .id_ validation/setting."""
for id_ in self.illegal_ids:
with self.assertRaises(HandledException):
- self.checked_class(id_, *self.default_init_args)
+ self._make_from_defaults(id_)
for id_ in self.legal_ids:
- obj = self.checked_class(id_, *self.default_init_args)
+ obj = self._make_from_defaults(id_)
self.assertEqual(obj.id_, id_)
- @_within_checked_class
- def test_versioned_defaults(self) -> None:
- """Test defaults of VersionedAttributes."""
- id_ = self.legal_ids[0]
- obj = self.checked_class(id_, *self.default_init_args)
- for k, v in self.versioned_defaults_to_test.items():
- self.assertEqual(getattr(obj, k).newest, v)
-
-
-class TestCaseWithDB(TestCase):
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_set(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str | float]
+ ) -> None:
+ """Test VersionedAttribute.set() behaves as expected."""
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ # check same value does not get set twice in a row,
+ # and that not even its timestamp get updated
+ timestamp = list(attr.history.keys())[0]
+ attr.set(default)
+ self.assertEqual(list(attr.history.values()), [default])
+ self.assertEqual(list(attr.history.keys())[0], timestamp)
+ # check that different value _will_ be set/added
+ attr.set(to_set[0])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0]]
+ self.assertEqual(timesorted_vals, expected)
+ # check that a previously used value can be set if not most recent
+ attr.set(default)
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default]
+ self.assertEqual(timesorted_vals, expected)
+ # again check for same value not being set twice in a row, even for
+ # later items
+ attr.set(to_set[1])
+ timesorted_vals = [attr.history[t] for
+ t in sorted(attr.history.keys())]
+ expected = [default, to_set[0], default, to_set[1]]
+ self.assertEqual(timesorted_vals, expected)
+ attr.set(to_set[1])
+ self.assertEqual(timesorted_vals, expected)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_newest(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str | float]
+ ) -> None:
+ """Test VersionedAttribute.newest."""
+ # check .newest on empty history returns .default
+ self.assertEqual(attr.newest, default)
+ # check newest element always returned
+ for v in [to_set[0], to_set[1]]:
+ attr.set(v)
+ self.assertEqual(attr.newest, v)
+ # check newest element returned even if also early value
+ attr.set(default)
+ self.assertEqual(attr.newest, default)
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_at(self,
+ _: Any,
+ __: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str | float]
+ ) -> None:
+ """Test .at() returns values nearest to queried time, or default."""
+ # check .at() return default on empty history
+ timestamp_a = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_a), default)
+ # check value exactly at timestamp returned
+ attr.set(to_set[0])
+ timestamp_b = list(attr.history.keys())[0]
+ self.assertEqual(attr.at(timestamp_b), to_set[0])
+ # check earliest value returned if exists, rather than default
+ self.assertEqual(attr.at(timestamp_a), to_set[0])
+ # check reverts to previous value for timestamps not indexed
+ sleep(0.00001)
+ timestamp_between = datetime.now().strftime(TIMESTAMP_FMT)
+ sleep(0.00001)
+ attr.set(to_set[1])
+ timestamp_c = sorted(attr.history.keys())[-1]
+ self.assertEqual(attr.at(timestamp_c), to_set[1])
+ self.assertEqual(attr.at(timestamp_between), to_set[0])
+ sleep(0.00001)
+ timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT)
+ self.assertEqual(attr.at(timestamp_after_c), to_set[1])
+
+
+class TestCaseWithDB(TestCaseAugmented):
"""Module tests not requiring DB setup."""
- checked_class: Any
default_ids: tuple[int | str, int | str, int | str] = (1, 2, 3)
- default_init_kwargs: dict[str, Any] = {}
- test_versioneds: dict[str, type] = {}
def setUp(self) -> None:
Condition.empty_cache()
return db_found
def _change_obj(self, obj: object) -> str:
- attr_name: str = self.checked_class.to_save[-1]
+ attr_name: str = self.checked_class.to_save_simples[-1]
attr = getattr(obj, attr_name)
new_attr: str | int | float | bool
if isinstance(attr, (int, float)):
hashes_db_found = [hash(x) for x in db_found]
self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
- @_within_checked_class
- def test_saving_versioned(self) -> None:
+ def check_by_date_range_with_limits(self,
+ date_col: str,
+ set_id_field: bool = True
+ ) -> None:
+ """Test .by_date_range_with_limits."""
+ # pylint: disable=too-many-locals
+ f = self.checked_class.by_date_range_with_limits
+ # check illegal ranges
+ legal_range = ('yesterday', 'tomorrow')
+ for i in [0, 1]:
+ for bad_date in ['foo', '2024-02-30', '2024-01-01 12:00:00']:
+ date_range = list(legal_range[:])
+ date_range[i] = bad_date
+ with self.assertRaises(HandledException):
+ f(self.db_conn, date_range, date_col)
+ # check empty, translation of 'yesterday' and 'tomorrow'
+ items, start, end = f(self.db_conn, legal_range, date_col)
+ self.assertEqual(items, [])
+ yesterday = datetime.now() + timedelta(days=-1)
+ tomorrow = datetime.now() + timedelta(days=+1)
+ self.assertEqual(start, yesterday.strftime(DATE_FORMAT))
+ self.assertEqual(end, tomorrow.strftime(DATE_FORMAT))
+ # prepare dated items for non-empty results
+ kwargs_with_date = self.default_init_kwargs.copy()
+ if set_id_field:
+ kwargs_with_date['id_'] = None
+ objs = []
+ dates = ['2024-01-01', '2024-01-02', '2024-01-04']
+ for date in ['2024-01-01', '2024-01-02', '2024-01-04']:
+ kwargs_with_date['date'] = date
+ obj = self.checked_class(**kwargs_with_date)
+ objs += [obj]
+ # check ranges still empty before saving
+ date_range = [dates[0], dates[-1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check all objs displayed within closed interval
+ for obj in objs:
+ obj.save(self.db_conn)
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], objs)
+ # check that only displayed what exists within interval
+ date_range = ['2023-12-20', '2024-01-03']
+ expected = [objs[0], objs[1]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ date_range = ['2024-01-03', '2024-01-30']
+ expected = [objs[2]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], expected)
+ # check that inverted interval displays nothing
+ date_range = [dates[-1], dates[0]]
+ self.assertEqual(f(self.db_conn, date_range, date_col)[0], [])
+ # check that "today" is interpreted, and single-element interval
+ today_date = datetime.now().strftime(DATE_FORMAT)
+ kwargs_with_date['date'] = today_date
+ obj_today = self.checked_class(**kwargs_with_date)
+ obj_today.save(self.db_conn)
+ date_range = ['today', 'today']
+ items, start, end = f(self.db_conn, date_range, date_col)
+ self.assertEqual(start, today_date)
+ self.assertEqual(start, end)
+ self.assertEqual(items, [obj_today])
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_saving_versioned_attributes(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str | float]
+ ) -> None:
"""Test storage and initialization of versioned attributes."""
- def retrieve_attr_vals() -> list[object]:
+
+ def retrieve_attr_vals(attr: VersionedAttribute) -> list[object]:
attr_vals_saved: list[object] = []
- assert hasattr(retrieved, 'id_')
for row in self.db_conn.row_where(attr.table_name, 'parent',
- retrieved.id_):
+ owner.id_):
attr_vals_saved += [row[2]]
return attr_vals_saved
- for attr_name, type_ in self.test_versioneds.items():
- # fail saving attributes on non-saved owner
- owner = self.checked_class(None, **self.default_init_kwargs)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- with self.assertRaises(NotFoundException):
- attr.save(self.db_conn)
- owner.save(self.db_conn)
- # check stored attribute is as expected
- retrieved = self._load_from_db(owner.id_)[0]
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
- # check owner.save() created entries in attr table
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check setting new val to attr inconsequential to DB without save
- attr.set(vals[0])
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals, attr_vals_saved)
- # check save finally adds new val
- attr.save(self.db_conn)
- attr_vals_saved = retrieve_attr_vals()
- self.assertEqual(vals + [vals[0]], attr_vals_saved)
- @_within_checked_class
+ attr.set(to_set[0])
+ # check that without attr.save() no rows in DB
+ rows = self.db_conn.row_where(attr.table_name, 'parent', owner.id_)
+ self.assertEqual([], rows)
+ # fail saving attributes on non-saved owner
+ with self.assertRaises(NotFoundException):
+ attr.save(self.db_conn)
+ # check owner.save() created entries as expected in attr table
+ owner.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check changing attr val without save affects owner in memory …
+ attr.set(to_set[1])
+ cmp_attr = getattr(owner, attr_name)
+ self.assertEqual(to_set, list(cmp_attr.history.values()))
+ self.assertEqual(cmp_attr.history, attr.history)
+ # … but does not yet affect DB
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual([to_set[0]], attr_vals_saved)
+ # check individual attr.save also stores new val to DB
+ attr.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals(attr)
+ self.assertEqual(to_set, attr_vals_saved)
+
+ @TestCaseAugmented._run_if_checked_class
def test_saving_and_caching(self) -> None:
"""Test effects of .cache() and .save()."""
id1 = self.default_ids[0]
# check failure to cache without ID (if None-ID input possible)
if isinstance(id1, int):
- obj0 = self.checked_class(None, **self.default_init_kwargs)
+ obj0 = self._make_from_defaults(None)
with self.assertRaises(HandledException):
obj0.cache()
# check mere object init itself doesn't even store in cache
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
self.assertEqual(self.checked_class.get_cache(), {})
# check .cache() fills cache, but not DB
obj1.cache()
self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
- db_found = self._load_from_db(id1)
- self.assertEqual(db_found, [])
+ found_in_db = self._load_from_db(id1)
+ self.assertEqual(found_in_db, [])
# check .save() sets ID (for int IDs), updates cache, and fills DB
# (expect ID to be set to id1, despite obj1 already having that as ID:
# it's generated by cursor.lastrowid on the DB table, and with obj1
# not written there, obj2 should get it first!)
id_input = None if isinstance(id1, int) else id1
- obj2 = self.checked_class(id_input, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id_input)
obj2.save(self.db_conn)
- obj2_hash = hash(obj2)
self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
- db_found += self._load_from_db(id1)
- self.assertEqual([hash(o) for o in db_found], [obj2_hash])
+ # NB: we'll only compare hashes because obj2 itself disappears on
+ # .from_table_row-trioggered database reload
+ obj2_hash = hash(obj2)
+ found_in_db += self._load_from_db(id1)
+ self.assertEqual([hash(o) for o in found_in_db], [obj2_hash])
# check we cannot overwrite obj2 with obj1 despite its same ID,
# since it has disappeared now
with self.assertRaises(HandledException):
obj1.save(self.db_conn)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id(self) -> None:
"""Test .by_id()."""
id1, id2, _ = self.default_ids
# check failure if not yet saved
- obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ obj1 = self._make_from_defaults(id1)
with self.assertRaises(NotFoundException):
self.checked_class.by_id(self.db_conn, id1)
# check identity of cached and retrieved
obj1.cache()
self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
# check identity of saved and retrieved
- obj2 = self.checked_class(id2, **self.default_init_kwargs)
+ obj2 = self._make_from_defaults(id2)
obj2.save(self.db_conn)
self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_by_id_or_create(self) -> None:
"""Test .by_id_or_create."""
# check .by_id_or_create fails if wrong class
self.checked_class.by_id(self.db_conn, item.id_)
self.assertEqual(self.checked_class(item.id_), item)
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_from_table_row(self) -> None:
"""Test .from_table_row() properly reads in class directly from DB."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id_)
obj.save(self.db_conn)
assert isinstance(obj.id_, type(id_))
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', obj.id_):
# check .from_table_row reproduces state saved, no matter if obj
# later changed (with caching even)
+ # NB: we'll only compare hashes because obj itself disappears on
+ # .from_table_row-triggered database reload
hash_original = hash(obj)
attr_name = self._change_obj(obj)
obj.cache()
# check cache contains what .from_table_row just produced
self.assertEqual({retrieved.id_: retrieved},
self.checked_class.get_cache())
- # check .from_table_row also reads versioned attributes from DB
- for attr_name, type_ in self.test_versioneds.items():
- owner = self.checked_class(None)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- owner.save(self.db_conn)
- for row in self.db_conn.row_where(owner.table_name, 'id',
+
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_history_from_row(self,
+ owner: Any,
+ _: str,
+ attr: VersionedAttribute,
+ default: str | float,
+ to_set: list[str | float]
+ ) -> None:
+ """"Test VersionedAttribute.history_from_row() knows its DB rows."""
+ attr.set(to_set[0])
+ attr.set(to_set[1])
+ owner.save(self.db_conn)
+ # make empty VersionedAttribute, fill from rows, compare to owner's
+ for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
+ loaded_attr = VersionedAttribute(owner, attr.table_name, default)
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
owner.id_):
- retrieved = owner.__class__.from_table_row(self.db_conn, row)
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
+ loaded_attr.history_from_row(row)
+ self.assertEqual(len(attr.history.keys()),
+ len(loaded_attr.history.keys()))
+ for timestamp, value in attr.history.items():
+ self.assertEqual(value, loaded_attr.history[timestamp])
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_all(self) -> None:
"""Test .all() and its relation to cache and savings."""
- id_1, id_2, id_3 = self.default_ids
- item1 = self.checked_class(id_1, **self.default_init_kwargs)
- item2 = self.checked_class(id_2, **self.default_init_kwargs)
- item3 = self.checked_class(id_3, **self.default_init_kwargs)
+ id1, id2, id3 = self.default_ids
+ item1 = self._make_from_defaults(id1)
+ item2 = self._make_from_defaults(id2)
+ item3 = self._make_from_defaults(id3)
# check .all() returns empty list on un-cached items
self.assertEqual(self.checked_class.all(self.db_conn), [])
# check that all() shows only cached/saved items
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item2, item3]))
- @_within_checked_class
+ @TestCaseAugmented._run_if_checked_class
def test_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
id1 = self.default_ids[0]
- obj = self.checked_class(id1, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id1)
obj.save(self.db_conn)
# change object, expect retrieved through .by_id to carry change
attr_name = self._change_obj(obj)
retrieved = self.checked_class.by_id(self.db_conn, id1)
self.assertEqual(new_attr, getattr(retrieved, attr_name))
- @_within_checked_class
- def test_versioned_singularity_title(self) -> None:
- """Test singularity of VersionedAttributes on saving (with .title)."""
- if 'title' in self.test_versioneds:
- obj = self.checked_class(None)
- obj.save(self.db_conn)
- assert isinstance(obj.id_, int)
- # change obj, expect retrieved through .by_id to carry change
- obj.title.set('named')
- retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
- self.assertEqual(obj.title.history, retrieved.title.history)
-
- @_within_checked_class
+ @TestCaseAugmented._run_on_versioned_attributes
+ def test_versioned_singularity(self,
+ owner: Any,
+ attr_name: str,
+ attr: VersionedAttribute,
+ _: str | float,
+ to_set: list[str | float]
+ ) -> None:
+ """Test singularity of VersionedAttributes on saving."""
+ owner.save(self.db_conn)
+ # change obj, expect retrieved through .by_id to carry change
+ attr.set(to_set[0])
+ retrieved = self.checked_class.by_id(self.db_conn, owner.id_)
+ attr_retrieved = getattr(retrieved, attr_name)
+ self.assertEqual(attr.history, attr_retrieved.history)
+
+ @TestCaseAugmented._run_if_checked_class
def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, **self.default_init_kwargs)
+ obj = self._make_from_defaults(id_)
# check removal only works after saving
with self.assertRaises(HandledException):
obj.remove(self.db_conn)
self.assertEqual(self.conn.getresponse().status, expected_code)
def check_post(self, data: Mapping[str, object], target: str,
- expected_code: int, redirect_location: str = '') -> None:
+ expected_code: int = 302, redir: str = '') -> None:
"""Check that POST of data to target yields expected_code."""
encoded_form_data = urlencode(data, doseq=True).encode('utf-8')
headers = {'Content-Type': 'application/x-www-form-urlencoded',
self.conn.request('POST', target,
body=encoded_form_data, headers=headers)
if 302 == expected_code:
- if redirect_location == '':
- redirect_location = target
- self.check_redirect(redirect_location)
+ redir = target if redir == '' else redir
+ self.check_redirect(redir)
else:
self.assertEqual(self.conn.getresponse().status, expected_code)
"""POST basic Process."""
if not form_data:
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.1}
- self.check_post(form_data, f'/process?id={id_}', 302,
- f'/process?id={id_}')
+ self.check_post(form_data, f'/process?id={id_}',
+ redir=f'/process?id={id_}')
return form_data
def check_json_get(self, path: str, expected: dict[str, object]) -> None:
timestamp keys of VersionedAttribute history keys into integers
counting chronologically forward from 0.
"""
+
def rewrite_history_keys_in(item: Any) -> Any:
if isinstance(item, dict):
if '_versioned' in item.keys():
elif isinstance(item, list):
item[:] = [rewrite_history_keys_in(i) for i in item]
return item
+
self.conn.request('GET', path)
response = self.conn.getresponse()
self.assertEqual(response.status, 200)
+++ /dev/null
-""""Test Versioned Attributes in the abstract."""
-from unittest import TestCase
-from time import sleep
-from datetime import datetime
-from tests.utils import TestCaseWithDB
-from plomtask.versioned_attributes import VersionedAttribute, TIMESTAMP_FMT
-from plomtask.db import BaseModel
-
-SQL_TEST_TABLE_STR = '''
-CREATE TABLE versioned_tests (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- value TEXT NOT NULL,
- PRIMARY KEY (parent, timestamp)
-);
-'''
-SQL_TEST_TABLE_FLOAT = '''
-CREATE TABLE versioned_tests (
- parent INTEGER NOT NULL,
- timestamp TEXT NOT NULL,
- value REAL NOT NULL,
- PRIMARY KEY (parent, timestamp)
-);
-'''
-
-
-class TestParentType(BaseModel[int]):
- """Dummy abstracting whatever may use VersionedAttributes."""
-
-
-class TestsSansDB(TestCase):
- """Tests not requiring DB setup."""
-
- def test_VersionedAttribute_set(self) -> None:
- """Test .set() behaves as expected."""
- # check value gets set even if already is the default
- attr = VersionedAttribute(None, '', 'A')
- attr.set('A')
- self.assertEqual(list(attr.history.values()), ['A'])
- # check same value does not get set twice in a row,
- # and that not even its timestamp get updated
- timestamp = list(attr.history.keys())[0]
- attr.set('A')
- self.assertEqual(list(attr.history.values()), ['A'])
- self.assertEqual(list(attr.history.keys())[0], timestamp)
- # check that different value _will_ be set/added
- attr.set('B')
- self.assertEqual(sorted(attr.history.values()), ['A', 'B'])
- # check that a previously used value can be set if not most recent
- attr.set('A')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B'])
- # again check for same value not being set twice in a row, even for
- # later items
- attr.set('D')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D'])
- attr.set('D')
- self.assertEqual(sorted(attr.history.values()), ['A', 'A', 'B', 'D'])
-
- def test_VersionedAttribute_newest(self) -> None:
- """Test .newest returns newest element, or default on empty."""
- attr = VersionedAttribute(None, '', 'A')
- self.assertEqual(attr.newest, 'A')
- attr.set('B')
- self.assertEqual(attr.newest, 'B')
- attr.set('C')
-
- def test_VersionedAttribute_at(self) -> None:
- """Test .at() returns values nearest to queried time, or default."""
- # check .at() return default on empty history
- attr = VersionedAttribute(None, '', 'A')
- timestamp_a = datetime.now().strftime(TIMESTAMP_FMT)
- self.assertEqual(attr.at(timestamp_a), 'A')
- # check value exactly at timestamp returned
- attr.set('B')
- timestamp_b = list(attr.history.keys())[0]
- self.assertEqual(attr.at(timestamp_b), 'B')
- # check earliest value returned if exists, rather than default
- self.assertEqual(attr.at(timestamp_a), 'B')
- # check reverts to previous value for timestamps not indexed
- sleep(0.00001)
- timestamp_between = datetime.now().strftime(TIMESTAMP_FMT)
- sleep(0.00001)
- attr.set('C')
- timestamp_c = sorted(attr.history.keys())[-1]
- self.assertEqual(attr.at(timestamp_c), 'C')
- self.assertEqual(attr.at(timestamp_between), 'B')
- sleep(0.00001)
- timestamp_after_c = datetime.now().strftime(TIMESTAMP_FMT)
- self.assertEqual(attr.at(timestamp_after_c), 'C')
-
-
-class TestsWithDBStr(TestCaseWithDB):
- """Module tests requiring DB setup."""
- default_vals: list[str | float] = ['A', 'B', 'C']
- init_sql = SQL_TEST_TABLE_STR
-
- def setUp(self) -> None:
- super().setUp()
- self.db_conn.exec(self.init_sql)
- self.test_parent = TestParentType(1)
- self.attr = VersionedAttribute(self.test_parent,
- 'versioned_tests', self.default_vals[0])
-
- def test_VersionedAttribute_save(self) -> None:
- """Test .save() to write to DB."""
- # check mere .set() calls do not by themselves reflect in the DB
- self.attr.set(self.default_vals[1])
- self.assertEqual([],
- self.db_conn.row_where('versioned_tests',
- 'parent', 1))
- # check .save() makes history appear in DB
- self.attr.save(self.db_conn)
- vals_found = []
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- vals_found += [row[2]]
- self.assertEqual([self.default_vals[1]], vals_found)
- # check .save() also updates history in DB
- self.attr.set(self.default_vals[2])
- self.attr.save(self.db_conn)
- vals_found = []
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- vals_found += [row[2]]
- self.assertEqual([self.default_vals[1], self.default_vals[2]],
- sorted(vals_found))
-
- def test_VersionedAttribute_history_from_row(self) -> None:
- """"Test .history_from_row() properly interprets DB rows."""
- self.attr.set(self.default_vals[1])
- self.attr.set(self.default_vals[2])
- self.attr.save(self.db_conn)
- loaded_attr = VersionedAttribute(self.test_parent, 'versioned_tests',
- self.default_vals[0])
- for row in self.db_conn.row_where('versioned_tests', 'parent', 1):
- loaded_attr.history_from_row(row)
- for timestamp, value in self.attr.history.items():
- self.assertEqual(value, loaded_attr.history[timestamp])
- self.assertEqual(len(self.attr.history.keys()),
- len(loaded_attr.history.keys()))
-
-
-class TestsWithDBFloat(TestsWithDBStr):
- """Module tests requiring DB setup."""
- default_vals: list[str | float] = [0.9, 1.1, 2]
- init_sql = SQL_TEST_TABLE_FLOAT