from typing import Any
from sqlite3 import Row
from datetime import datetime, timedelta
-from plomtask.exceptions import HandledException
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.todos import Todo
from plomtask.dating import (DATE_FORMAT, valid_date)
table_name = 'days'
to_save = ['comment']
- def __init__(self,
- date: str,
- comment: str = '',
- init_empty_todo_list: bool = False
- ) -> None:
+ def __init__(self, date: str, comment: str = '') -> None:
id_ = valid_date(date)
super().__init__(id_)
self.datetime = datetime.strptime(self.date, DATE_FORMAT)
self.comment = comment
- self._todos: list[Todo] | None = [] if init_empty_todo_list else None
+ self.todos: list[Todo] = []
def __lt__(self, other: Day) -> bool:
return self.date < other.date
def from_table_row(cls, db_conn: DatabaseConnection, row: Row | list[Any]
) -> Day:
"""Make from DB row, with linked Todos."""
- # pylint: disable=protected-access
- # (since on ._todo we're only meddling within cls)
day = super().from_table_row(db_conn, row)
assert isinstance(day.id_, str)
- day._todos = Todo.by_date(db_conn, day.id_)
+ day.todos = Todo.by_date(db_conn, day.id_)
return day
@classmethod
def by_id(cls,
db_conn: DatabaseConnection, id_: str | None,
create: bool = False,
- init_empty_todo_list: bool = False
) -> Day:
- """Extend BaseModel.by_id with init_empty_todo_list flag."""
- # pylint: disable=protected-access
- # (since on ._todo we're only meddling within cls)
+ """Extend BaseModel.by_id checking for new/lost .todos."""
day = super().by_id(db_conn, id_, create)
- if init_empty_todo_list and day._todos is None:
- day._todos = []
+ assert day.id_ is not None
+ if day.id_ in Todo.days_to_update:
+ Todo.days_to_update.remove(day.id_)
+ day.todos = Todo.by_date(db_conn, day.id_)
return day
@classmethod
return days
days.sort()
if start_date not in [d.date for d in days]:
- days[:] = [Day(start_date, init_empty_todo_list=True)] + days
+ days[:] = [Day(start_date)] + days
if end_date not in [d.date for d in days]:
- days += [Day(end_date, init_empty_todo_list=True)]
+ days += [Day(end_date)]
if len(days) > 1:
gapless_days = []
for i, day in enumerate(days):
gapless_days += [day]
if i < len(days) - 1:
while day.next_date != days[i+1].date:
- day = Day(day.next_date, init_empty_todo_list=True)
+ day = Day(day.next_date)
gapless_days += [day]
days[:] = gapless_days
return days
next_datetime = self.datetime + timedelta(days=1)
return next_datetime.strftime(DATE_FORMAT)
- @property
- def todos(self) -> list[Todo]:
- """Return self.todos if initialized, else raise Exception."""
- if self._todos is None:
- msg = 'Trying to return from un-initialized Day.todos.'
- raise HandledException(msg)
- return list(self._todos)
-
@property
def calendarized_todos(self) -> list[Todo]:
"""Return only those of self.todos that have .calendarize set."""
@property
def _user_version(self) -> int:
"""Get DB user_version."""
- # pylint: disable=protected-access
- # (since we remain within class)
- return self.__class__._get_version_of_db(self.path)
+ return self._get_version_of_db(self.path)
def _validate_schema(self) -> None:
"""Compare found schema with what's stored at PATH_DB_SCHEMA."""
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
to_search: list[str] = []
+ _exists = True
def __init__(self, id_: BaseModelId | None) -> None:
if isinstance(id_, int) and id_ < 1:
return self.id_ < other.id_
# cache management
-
- @classmethod
- def _get_cached(cls: type[BaseModelInstance],
- id_: BaseModelId) -> BaseModelInstance | None:
- """Get object of id_ from class's cache, or None if not found."""
- # pylint: disable=consider-iterating-dictionary
- cache = cls.get_cache()
- if id_ in cache.keys():
- obj = cache[id_]
- assert isinstance(obj, cls)
- return obj
- return None
+ # (we primarily use the cache to ensure we work on the same object in
+ # memory no matter where and how we retrieve it, e.g. we don't want
+ # .by_id() calls to create a new object each time, but rather a pointer
+ # to the one already instantiated)
+
+ def __getattribute__(self, name: str) -> Any:
+ """Ensure fail if ._disappear() was called, except to check ._exists"""
+ if name != '_exists' and not super().__getattribute__('_exists'):
+ raise HandledException('Object does not exist.')
+ return super().__getattribute__(name)
+
+ def _disappear(self) -> None:
+ """Invalidate object, make future use raise exceptions."""
+ assert self.id_ is not None
+ if self._get_cached(self.id_):
+ self._uncache()
+ to_kill = list(self.__dict__.keys())
+ for attr in to_kill:
+ delattr(self, attr)
+ self._exists = False
@classmethod
def empty_cache(cls) -> None:
cls.cache_ = d
return cls.cache_
- def cache(self) -> None:
- """Update object in class's cache."""
+ @classmethod
+ def _get_cached(cls: type[BaseModelInstance],
+ id_: BaseModelId) -> BaseModelInstance | None:
+ """Get object of id_ from class's cache, or None if not found."""
+ # pylint: disable=consider-iterating-dictionary
+ cache = cls.get_cache()
+ if id_ in cache.keys():
+ obj = cache[id_]
+ assert isinstance(obj, cls)
+ return obj
+ return None
+
+ def _cache(self) -> None:
+ """Update object in class's cache.
+
+ Also calls ._disappear if cache holds older reference to object of same
+ ID, but different memory address, to avoid doing anything with
+ dangling leftovers.
+ """
if self.id_ is None:
raise HandledException('Cannot cache object without ID.')
- cache = self.__class__.get_cache()
+ cache = self.get_cache()
+ old_cached = self._get_cached(self.id_)
+ if old_cached and id(old_cached) != id(self):
+ # pylint: disable=protected-access
+ # (cause we remain within the class)
+ old_cached._disappear()
cache[self.id_] = self
- def uncache(self) -> None:
+ def _uncache(self) -> None:
"""Remove self from cache."""
if self.id_ is None:
raise HandledException('Cannot un-cache object without ID.')
- cache = self.__class__.get_cache()
+ cache = self.get_cache()
del cache[self.id_]
# object retrieval and generation
# pylint: disable=unused-argument
db_conn: DatabaseConnection,
row: Row | list[Any]) -> BaseModelInstance:
- """Make from DB row, write to DB cache."""
+ """Make from DB row, update DB cache with it."""
obj = cls(*row)
- obj.cache()
+ obj._cache()
return obj
@classmethod
if not obj:
for row in db_conn.row_where(cls.table_name, 'id', id_):
obj = cls.from_table_row(db_conn, row)
- obj.cache()
break
if obj:
return obj
values)
if not isinstance(self.id_, str):
self.id_ = cursor.lastrowid # type: ignore[assignment]
- self.cache()
+ self._cache()
for attr_name in self.to_save_versioned:
getattr(self, attr_name).save(db_conn)
for table, column, attr_name, key_index in self.to_save_relations:
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB and cache, including dependencies."""
- # pylint: disable=protected-access
- # (since we remain within class)
- if self.id_ is None or self.__class__._get_cached(self.id_) is None:
+ if self.id_ is None or self._get_cached(self.id_) is None:
raise HandledException('cannot remove unsaved item')
for attr_name in self.to_save_versioned:
getattr(self, attr_name).remove(db_conn)
for table, column, attr_name, _ in self.to_save_relations:
db_conn.delete_where(table, column, self.id_)
- self.uncache()
+ self._uncache()
db_conn.delete_where(self.table_name, 'id', self.id_)
+ self._disappear()
msg = f'{not_found_msg}: {self._site}'
raise NotFoundException(msg)
except HandledException as error:
- html = self.server.jinja.\
- get_template('msg.html').render(msg=error)
+ for cls in (Day, Todo, Condition, Process, ProcessStep):
+ assert hasattr(cls, 'empty_cache')
+ cls.empty_cache()
+ tmpl = self.server.jinja.get_template('msg.html')
+ html = tmpl.render(msg=error)
self._send_html(html, error.http_code)
finally:
self.conn.close()
def do_GET_day(self) -> dict[str, object]:
"""Show single Day of ?date=."""
date = self._params.get_str('date', date_in_n_days(0))
- day = Day.by_id(self.conn, date, create=True,
- init_empty_todo_list=True)
+ day = Day.by_id(self.conn, date, create=True)
make_type = self._params.get_str('make_type')
conditions_present = []
enablers_for = {}
if len(efforts) > 0:
todo.effort = float(efforts[i]) if efforts[i] else None
todo.save(self.conn)
- for condition in todo.enables:
- condition.save(self.conn)
- for condition in todo.disables:
- condition.save(self.conn)
return f'/day?date={date}&make_type={make_type}'
def do_POST_todo(self) -> str:
todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
todo.comment = self._form_data.get_str('comment', ignore_strict=True)
todo.save(self.conn)
- for condition in todo.enables:
- condition.save(self.conn)
- for condition in todo.disables:
- condition.save(self.conn)
return f'/todo?id={todo.id_}'
def do_POST_process_descriptions(self) -> str:
None)]
except ValueError:
new_step_title = step_identifier
- process.uncache()
process.set_steps(self.conn, steps)
process.set_step_suppressions(self.conn,
self._form_data.
get_all_int('suppresses'))
- process.save(self.conn)
owners_to_set = []
new_owner_title = None
for owner_identifier in self._form_data.get_all_str('step_of'):
elif new_owner_title:
title_b64_encoded = b64encode(new_owner_title.encode()).decode()
params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
+ process.save(self.conn)
return f'/process?{params}'
def do_POST_condition_descriptions(self) -> str:
def from_table_row(cls, db_conn: DatabaseConnection,
row: Row | list[Any]) -> Process:
"""Make from DB row, with dependencies."""
- # pylint: disable=no-member
process = super().from_table_row(db_conn, row)
assert isinstance(process.id_, int)
for name in ('title', 'description', 'effort'):
table = f'process_{name}s'
for row_ in db_conn.row_where(table, 'parent', process.id_):
getattr(process, name).history_from_row(row_)
- for row_ in db_conn.row_where('process_steps', 'owner',
- process.id_):
- step = ProcessStep.from_table_row(db_conn, row_)
- process.explicit_steps += [step]
- for row_ in db_conn.row_where('process_step_suppressions', 'process',
- process.id_):
- step = ProcessStep.by_id(db_conn, row_[1])
- process.suppressed_steps += [step]
for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
'process', process.id_):
target = getattr(process, name)
target += [Condition.by_id(db_conn, c_id)]
+ for row_ in db_conn.row_where('process_steps', 'owner', process.id_):
+ step = ProcessStep.from_table_row(db_conn, row_)
+ process.explicit_steps += [step]
+ for row_ in db_conn.row_where('process_step_suppressions', 'process',
+ process.id_):
+ step = ProcessStep.by_id(db_conn, row_[1])
+ process.suppressed_steps += [step]
process.n_owners = len(process.used_as_step_by(db_conn))
return process
walk_steps(step)
assert isinstance(self.id_, int)
- for step in self.explicit_steps:
- step.uncache()
- self.explicit_steps = []
- db_conn.delete_where('process_steps', 'owner', self.id_)
- for step in steps:
- step.save(db_conn)
+ for step in [s for s in self.explicit_steps if s not in steps]:
+ step.remove(db_conn)
+ for step in [s for s in steps if s not in self.explicit_steps]:
if step.parent_step_id is not None:
try:
parent_step = ProcessStep.by_id(db_conn,
except NotFoundException:
step.parent_step_id = None
walk_steps(step)
- self.explicit_steps += [step]
+ step.save(db_conn)
def set_owners(self, db_conn: DatabaseConnection,
owner_ids: list[int]) -> None:
self.step_process_id = step_process_id
self.parent_step_id = parent_step_id
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Remove from DB, and owner's .explicit_steps."""
+ super().save(db_conn)
+ owner = Process.by_id(db_conn, self.owner_id)
+ if self not in owner.explicit_steps:
+ for s in [s for s in owner.explicit_steps if s.id_ == self.id_]:
+ s.remove(db_conn)
+ owner.explicit_steps += [self]
+ owner.explicit_steps.sort(key=hash)
+
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, and owner's .explicit_steps."""
owner = Process.by_id(db_conn, self.owner_id)
"""Actionables."""
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any
+from typing import Any, Set
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.processes import Process, ProcessStepsNode
('todo_children', 'parent', 'children', 0),
('todo_children', 'child', 'parents', 1)]
to_search = ['comment']
+ days_to_update: Set[str] = set()
+ children: list[Todo]
+ parents: list[Todo]
# pylint: disable=too-many-arguments
def __init__(self, id_: int | None,
self.date = valid_date(date)
self.comment = comment
self.effort = effort
- self.children: list[Todo] = []
- self.parents: list[Todo] = []
+ self.children = []
+ self.parents = []
self.calendarize = calendarize
if not self.id_:
self.calendarize = self.process.calendarize
assert isinstance(todo.id_, int)
for t_id in db_conn.column_where('todo_children', 'child',
'parent', todo.id_):
- # pylint: disable=no-member
todo.children += [cls.by_id(db_conn, t_id)]
for t_id in db_conn.column_where('todo_children', 'parent',
'child', todo.id_):
- # pylint: disable=no-member
todo.parents += [cls.by_id(db_conn, t_id)]
for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'todo_{name}'
if self.effort and self.effort < 0 and self.is_deletable:
self.remove(db_conn)
return
+ if self.id_ is None:
+ self.__class__.days_to_update.add(self.date)
super().save(db_conn)
+ for condition in self.enables + self.disables + self.conditions:
+ condition.save(db_conn)
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, including relations."""
if not self.is_deletable:
raise HandledException('Cannot remove non-deletable Todo.')
+ self.__class__.days_to_update.add(self.date)
children_to_remove = self.children[:]
parents_to_remove = self.parents[:]
for child in children_to_remove:
def test_Condition_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
self.check_remove()
- c = Condition(None)
proc = Process(None)
proc.save(self.db_conn)
todo = Todo(None, proc, False, '2024-01-01')
for depender in (proc, todo):
assert hasattr(depender, 'save')
assert hasattr(depender, 'set_conditions')
+ c = Condition(None)
c.save(self.db_conn)
depender.save(self.db_conn)
depender.set_conditions(self.db_conn, [c.id_], 'conditions')
def test_Process_conditions_saving(self) -> None:
"""Test .save/.save_core."""
p, set1, set2, set3 = self.p_of_conditions()
- p.uncache()
r = Process.by_id(self.db_conn, p.id_)
self.assertEqual(sorted(r.conditions), sorted(set1))
self.assertEqual(sorted(r.enables), sorted(set2))
assert isinstance(p.id_, int)
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', p.id_):
- # pylint: disable=no-member
r = Process.from_table_row(self.db_conn, row)
self.assertEqual(sorted(r.conditions), sorted(set1))
self.assertEqual(sorted(r.enables), sorted(set2))
s_p2_to_p1_first = ProcessStep(None, p1.id_, p2.id_, s_p3_to_p1.id_)
steps_p1 += [s_p2_to_p1_first]
p1.set_steps(self.db_conn, steps_p1)
- seen_3 = ProcessStepsNode(p3, None, False, {}, True)
+ seen_3 = ProcessStepsNode(p3, None, False, {}, False)
+ p1_dict[1].steps[3].seen = True
p1_dict[2].steps[4] = ProcessStepsNode(p2, s_p3_to_p1.id_, True,
{3: seen_3})
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
s_p3_to_p2_first = ProcessStep(None, p2.id_, p3.id_, s_p3_to_p2.id_)
steps_p2 += [s_p3_to_p2_first]
p2.set_steps(self.db_conn, steps_p2)
- p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {})
+ p1_dict[1].steps[3].steps[7] = ProcessStepsNode(p3, 3, False, {}, True)
p1_dict[2].steps[4].steps[3].steps[7] = ProcessStepsNode(p3, 3, False,
- {}, True)
+ {}, False)
self.assertEqual(p1.get_steps(self.db_conn, None), p1_dict)
# ensure suppressed step nodes are hidden
assert isinstance(s_p3_to_p2.id_, int)
assert isinstance(p3.id_, int)
step = ProcessStep(None, p2.id_, p1.id_, None)
p2.set_steps(self.db_conn, [step])
+ step_id = step.id_
with self.assertRaises(HandledException):
p1.remove(self.db_conn)
p2.set_steps(self.db_conn, [])
with self.assertRaises(NotFoundException):
- ProcessStep.by_id(self.db_conn, step.id_)
+ ProcessStep.by_id(self.db_conn, step_id)
p1.remove(self.db_conn)
step = ProcessStep(None, p2.id_, p3.id_, None)
+ step_id = step.id_
p2.set_steps(self.db_conn, [step])
p2.remove(self.db_conn)
with self.assertRaises(NotFoundException):
- ProcessStep.by_id(self.db_conn, step.id_)
+ ProcessStep.by_id(self.db_conn, step_id)
todo = Todo(None, p3, False, '2024-01-01')
todo.save(self.db_conn)
with self.assertRaises(HandledException):
default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
'parent_step_id': 4}
- def test_ProcessStep_from_table_row(self) -> None:
- """Test .from_table_row() properly reads in class from DB"""
- self.check_from_table_row(2, 3, None)
+ def setUp(self) -> None:
+ super().setUp()
+ p = Process(1)
+ p.save(self.db_conn)
+ p = Process(2)
+ p.save(self.db_conn)
- def test_ProcessStep_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
- self.check_singularity('parent_step_id', 1, 2, 3, None)
+ def test_saving_and_caching(self) -> None:
+ """Test storage and initialization of instances and attributes."""
+ self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
def test_ProcessStep_remove(self) -> None:
"""Test .remove and unsetting of owner's .explicit_steps entry."""
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 1)
retrieved_step = retrieved_process.explicit_steps[0]
+ retrieved_step_id = retrieved_step.id_
self.assertEqual(retrieved_step.step_process_id, 2)
self.assertEqual(retrieved_step.owner_id, 1)
self.assertEqual(retrieved_step.parent_step_id, None)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(retrieved_process.explicit_steps, [])
with self.assertRaises(NotFoundException):
- ProcessStep.by_id(self.db_conn, retrieved_step.id_)
+ ProcessStep.by_id(self.db_conn, retrieved_step_id)
# post new first (top_level) step of process 3 to process 1
form_data_1['new_top_step'] = [3]
self.post_process(1, form_data_1)
self.assertEqual(retrieved_process.explicit_steps, [])
# post to process empty steps list but keep, expect 400
form_data_1['steps'] = []
- form_data_1['keep_step'] = [retrieved_step.id_]
+ form_data_1['keep_step'] = [retrieved_step_id]
self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
# post to process steps list with keep on non-created step, expect 400
- form_data_1['steps'] = [retrieved_step.id_]
- form_data_1['keep_step'] = [retrieved_step.id_]
+ form_data_1['steps'] = [retrieved_step_id]
+ form_data_1['keep_step'] = [retrieved_step_id]
self.check_post(form_data_1, '/process?id=1', 400, '/process?id=1')
# post to process steps list with keep and process ID, expect 200
- form_data_1[f'step_{retrieved_step.id_}_process_id'] = [2]
+ form_data_1[f'step_{retrieved_step_id}_process_id'] = [2]
self.post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 2)
retrieved_step_0 = retrieved_process.explicit_steps[0]
- self.assertEqual(retrieved_step_0.step_process_id, 2)
+ self.assertEqual(retrieved_step_0.step_process_id, 3)
self.assertEqual(retrieved_step_0.owner_id, 1)
self.assertEqual(retrieved_step_0.parent_step_id, None)
retrieved_step_1 = retrieved_process.explicit_steps[1]
- self.assertEqual(retrieved_step_1.step_process_id, 3)
+ self.assertEqual(retrieved_step_1.step_process_id, 2)
self.assertEqual(retrieved_step_1.owner_id, 1)
self.assertEqual(retrieved_step_1.parent_step_id, None)
# post to process steps list with keeps etc., but trigger recursion
self.assertEqual(retrieved_step_1.step_process_id, 3)
self.assertEqual(retrieved_step_1.owner_id, 1)
self.assertEqual(retrieved_step_1.parent_step_id, None)
- form_data_1[f'step_{retrieved_step_1.id_}_process_id'] = [3]
# post sub-step to step
- form_data_1[f'new_step_to_{retrieved_step_1.id_}'] = [3]
+ form_data_1[f'step_{retrieved_step_0.id_}_process_id'] = [3]
+ form_data_1[f'new_step_to_{retrieved_step_0.id_}'] = [3]
self.post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 3)
todo_2 = Todo.create_with_children(self.db_conn, proc2.id_, self.date1)
self.assertEqual(3, len(todo_2.children))
self.assertEqual(todo_1, todo_2.children[0])
- self.assertEqual(self.proc, todo_2.children[1].process)
- self.assertEqual(proc3, todo_2.children[2].process)
- todo_3 = todo_2.children[2]
+ self.assertEqual(self.proc, todo_2.children[2].process)
+ self.assertEqual(proc3, todo_2.children[1].process)
+ todo_3 = todo_2.children[1]
self.assertEqual(len(todo_3.children), 1)
self.assertEqual(todo_3.children[0].process, proc4)
todo_2 = Todo(None, self.proc, False, self.date1)
todo_2.save(self.db_conn)
todo_1.add_child(todo_2)
+ todo_1_id = todo_1.id_
todo_1.remove(self.db_conn)
with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, todo_1.id_)
+ Todo.by_id(self.db_conn, todo_1_id)
self.assertEqual(todo_0.children, [])
self.assertEqual(todo_2.parents, [])
todo_2.comment = 'foo'
todo_1.save(self.db_conn)
Todo.by_id(self.db_conn, todo_1.id_)
todo_1.comment = ''
+ todo_1_id = todo_1.id_
todo_1.save(self.db_conn)
with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, todo_1.id_)
+ Todo.by_id(self.db_conn, todo_1_id)
class TestsWithServer(TestCaseWithServer):
def test_do_POST_day_todo_doneness(self) -> None:
"""Test Todo doneness can be posted to Day view."""
- form_data = self.post_process()
+ self.post_process()
form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
for item in content:
expected_cache[item.id_] = item
self.assertEqual(self.checked_class.get_cache(), expected_cache)
+ hashes_content = [hash(x) for x in content]
db_found: list[Any] = []
for item in content:
assert isinstance(item.id_, type(self.default_ids[0]))
'id', item.id_):
db_found += [self.checked_class.from_table_row(self.db_conn,
row)]
- self.assertEqual(sorted(content), sorted(db_found))
+ hashes_db_found = [hash(x) for x in db_found]
+ self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
def check_saving_and_caching(self, **kwargs: Any) -> None:
"""Test instance.save in its core without relations."""
obj = self.checked_class(**kwargs) # pylint: disable=not-callable
# check object init itself doesn't store anything yet
self.check_storage([])
- # check saving stores in cache and DB
+ # check saving sets core attributes properly
obj.save(self.db_conn)
- self.check_storage([obj])
- # check core attributes set properly (and not unset by saving)
for key, value in kwargs.items():
self.assertEqual(getattr(obj, key), value)
+ # check saving stored properly in cache and DB
+ self.check_storage([obj])
def check_saving_of_versioned(self, attr_name: str, type_: type) -> None:
"""Test owner's versioned attributes."""
attr.set(vals[0])
attr.set(vals[1])
owner.save(self.db_conn)
- owner.uncache()
retrieved = owner.__class__.by_id(self.db_conn, owner.id_)
attr = getattr(retrieved, attr_name)
self.assertEqual(sorted(attr.history.values()), vals)
assert isinstance(obj.id_, type(self.default_ids[0]))
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', obj.id_):
+ hash_original = hash(obj)
retrieved = self.checked_class.from_table_row(self.db_conn, row)
- self.assertEqual(obj, retrieved)
- self.assertEqual({obj.id_: obj}, self.checked_class.get_cache())
+ self.assertEqual(hash_original, hash(retrieved))
+ self.assertEqual({retrieved.id_: retrieved},
+ self.checked_class.get_cache())
def check_versioned_from_table_row(self, attr_name: str,
type_: type) -> None: