From 33a4db0a8a1eacee8c15eb42c3f4a56c239048a1 Mon Sep 17 00:00:00 2001 From: Christian Heller Date: Fri, 15 Mar 2024 03:10:13 +0100 Subject: [PATCH] Add forking and testing to todo.py rewrite. --- new_todo/init.sql | 4 +- new_todo/todo.py | 260 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 239 insertions(+), 25 deletions(-) diff --git a/new_todo/init.sql b/new_todo/init.sql index 49b52d8..1be1422 100644 --- a/new_todo/init.sql +++ b/new_todo/init.sql @@ -3,7 +3,9 @@ CREATE TABLE days ( comment TEXT NOT NULL ); CREATE TABLE templates ( - id INTEGER PRIMARY KEY + id INTEGER PRIMARY KEY, + forked_from INTEGER, + forked_at TEXT ); CREATE TABLE versioned_default_efforts ( template INTEGER NOT NULL, diff --git a/new_todo/todo.py b/new_todo/todo.py index 8b8331c..b7ee52b 100755 --- a/new_todo/todo.py +++ b/new_todo/todo.py @@ -3,6 +3,9 @@ from sqlite3 import connect as sql_connect from http.server import BaseHTTPRequestHandler from urllib.parse import parse_qs from datetime import datetime, timedelta +from unittest import TestCase +from os.path import isfile +from time import sleep @@ -13,6 +16,7 @@ HTML_DIR='html' DATE_FORMAT = '%Y-%m-%d' +DATETIME_KEY_RESOLUTION = 24 @@ -29,25 +33,25 @@ class HandledException(Exception): class TodoDBFile: - def __init__(self, path): - from os.path import isfile + def __init__(self, path, force_creation=False): self.path = path if not isfile(self.path): - self.make_new_if_wanted_else_abort() - self.validate_schema() - - def make_new_if_wanted_else_abort(self): - create_question = f'Database file not found: {self.path}. Create? Y/n\n' - msg_on_no = 'Interpreting reply as "no", but cannot run without database file.' - legal_yesses = {'y', 'yes'} - create_reply = input(create_question) - if not create_reply.lower() in legal_yesses: - raise HandledException(msg_on_no) + self._make_new_if_wanted_else_abort(force_creation) + self._validate_schema() + + def _make_new_if_wanted_else_abort(self, force_creation): + if not force_creation: + create_question = f'Database file not found: {self.path}. Create? Y/n\n' + msg_on_no = 'Interpreting reply as "no", but cannot run without database file.' + legal_yesses = {'y', 'yes'} + create_reply = input(create_question) + if not create_reply.lower() in legal_yesses: + raise HandledException(msg_on_no) with sql_connect(self.path) as conn: with open(PATH_DB_SCHEMA, 'r') as f: conn.executescript(f.read()) - def validate_schema(self): + def _validate_schema(self): sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql' msg_wrong_schema = 'Database has wrong tables schema. Diff:\n' with sql_connect(self.path) as conn: @@ -114,19 +118,35 @@ class VersionedAttribute: (self.parent.id_, date, value)) @property - def newest_date(self): + def _newest_datetime(self): return sorted(self.history.keys())[-1] + def _forked_value_at(self, date_with_time): + return getattr(self.parent.forked_from, self.name).at(date_with_time) + @property def newest(self): if 0 == len(self.history): if self.parent.forked_from: - return getattr(self.parent.forked_from, self.name).newest + return self._forked_value_at(self.parent.forked_at) + return self.default + return self.history[self._newest_datetime] + + def at(self, date_with_time): + sorted_datetimes = sorted(self.history.keys()) + if self.parent.forked_from and (0 == len(sorted_datetimes) or sorted_datetimes[0] > date_with_time): + return self._forked_value_at(date_with_time) + elif 0 == len(sorted_datetimes): return self.default - return self.history[self.newest_date] + ret = self.history[sorted_datetimes[0]] + for k, v in self.history.items(): + if k > date_with_time: + break + ret = v + return ret def set(self, value): - if 0 == len(self.history) or value != self.history[self.newest_date]: + if 0 == len(self.history) or value != self.history[self._newest_datetime]: self.history[Day.todays_date(with_time=True)] = value @@ -193,13 +213,13 @@ class Day: def date_valid(cls, date): try: result = datetime.strptime(date, DATE_FORMAT) - except ValueError: + except (ValueError, TypeError): return None return result @classmethod def todays_date(cls, with_time=False): - cut_length = 19 if with_time else 10 + cut_length = DATETIME_KEY_RESOLUTION if with_time else 10 return str(datetime.now())[:cut_length] @classmethod @@ -234,16 +254,17 @@ class Day: class TodoTemplate: - def __init__(self, db_conn, id_): + def __init__(self, db_conn, id_=None, forked_from=None, forked_at=None): self.id_ = id_ - self.forked_from = None + self.forked_from = TodoTemplate.by_id(db_conn, forked_from) + self.forked_at = forked_at self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED') self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0) self.description = VersionedAttribute(db_conn, self, 'description', '') @classmethod def from_row(cls, db_conn, row): - return cls(db_conn, row[0]) + return cls(db_conn, row[0], row[1], row[2]) @classmethod def all(cls, db_conn): @@ -257,17 +278,21 @@ class TodoTemplate: for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)): return cls.from_row(db_conn, row) if make_if_none: - return cls(db_conn, id_) + return cls(db_conn, id_, None, None) return None def save(self, db_conn): - cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,)) + cursor = db_conn.exec('REPLACE INTO templates VALUES (?,?,?) RETURNING ID', + (self.id_, self.forked_from.id_ if self.forked_from else None, self.forked_at)) if self.id_ is None: self.id_ = cursor.fetchone()[0] self.title.save(db_conn) self.default_effort.save(db_conn) self.description.save(db_conn) + def fork(self, db_conn): + return self.__class__(db_conn, id_=None, forked_from=self.id_, forked_at=Day.todays_date(with_time=True)) + ######## web stuff ############ @@ -415,6 +440,8 @@ class TodoHandler(BaseHTTPRequestHandler): +# main loop + def main(): from http.server import HTTPServer from os import environ @@ -440,3 +467,188 @@ if __name__ == '__main__': main() except HandledException as e: e.nice_exit() + + + +# testing – run with: python3 -m unittest todo.py + +class TestWithDB(TestCase): + + def setUp(self): + self.db_file = TodoDBFile(f'test_db:{datetime.now().timestamp()}', force_creation=True) + self.db_conn = TodoDBConnection(self.db_file) + self.bak_0_path = f'{self.db_file.path}.bak.0' + + def tearDown(self): + from os import remove + remove(self.db_file.path) + for i in range(0, 9): + bak_path = f'{self.db_file.path}.bak.{i}' + if isfile(f'{self.db_file.path}.bak.{i}'): + remove(bak_path) + + def test_backup_bak_0_file_content(self): + day = Day('2024-01-01') + day.save(self.db_conn) + self.db_conn.commit() # backups before writing, so expect different file contents + with open(self.db_file.path, 'rb') as f1: + original_content = f1.read() + with open(self.bak_0_path, 'rb') as f2: + backup_content = f2.read() + self.assertNotEqual(original_content, backup_content) + self.db_conn.commit() # this time commit without changes, so expect equal file contents + with open(self.db_file.path, 'rb') as f1: + original_content = f1.read() + with open(self.bak_0_path, 'rb') as f2: + backup_content = f2.read() + self.assertEqual(original_content, backup_content) + + def test_backup_bak_0_file_attributes(self): + from os import stat + sleep(0.1) # so mtime would change if not copied + self.db_conn.commit() + original_stat = stat(self.db_file.path) + backup_stat = stat(self.bak_0_path) + for name in {'st_mode', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime'}: + self.assertEqual(getattr(original_stat, name), getattr(backup_stat, name)) + + def test_Day_by_date(self): + test_date_1 = '2024-02-28' + test_date_2 = '2024-02-29' + test_comment = 'foo' + day1 = Day(test_date_1, test_comment) + day1.save(self.db_conn) + retrieved_day = Day.by_date(self.db_conn, test_date_1) + self.assertEqual(day1, retrieved_day) + self.assertEqual(day1.comment, retrieved_day.comment) + self.assertEqual(None, Day.by_date(self.db_conn, test_date_2)) + self.assertEqual(Day(test_date_2), Day.by_date(self.db_conn, test_date_2, make_if_none=True)) + + def test_Day_all(self): + with self.assertRaises(HandledException): + Day.all(self.db_conn, date_range=(None, None)) + Day.all(self.db_conn, date_range=('foo', '')) + Day.all(self.db_conn, date_range=('', '2024-02-30')) + test_date_1 = str(datetime.now() - timedelta(days=2))[:10] + test_date_2 = Day.todays_date() + test_date_3 = str(datetime.now() + timedelta(days=2))[:10] + day1 = Day(test_date_1) + day2 = Day(test_date_2) + day3 = Day(test_date_3) + day1.save(self.db_conn) + day2.save(self.db_conn) + day3.save(self.db_conn) + self.assertEqual([day1, day2, day3], Day.all(self.db_conn)) + self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=('yesterday', ''))) + self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=(Day.yesterdays_date(), ''))) + self.assertEqual([day3], Day.all(self.db_conn, date_range=('tomorrow', ''))) + self.assertEqual([day3], Day.all(self.db_conn, date_range=(Day.tomorrows_date(), ''))) + self.assertEqual([day2], Day.all(self.db_conn, date_range=('today', Day.todays_date()))) + self.assertEqual([day1], Day.all(self.db_conn, date_range=('', 'yesterday'))) + self.assertEqual([Day(Day.yesterdays_date()), day2], + Day.all(self.db_conn, ensure_betweens=True, date_range=('yesterday', 'today'))) + self.assertEqual([day2, Day(Day.tomorrows_date())], + Day.all(self.db_conn, ensure_betweens=True, date_range=('today', 'tomorrow'))) + + def test_TodoTemplate_by_id(self): + tmpl = TodoTemplate(self.db_conn) + tmpl.save(self.db_conn) + retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_) + self.assertEqual(tmpl.id_, retrieved.id_) + self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1)) + self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1, make_if_none=True), TodoTemplate) + + def test_TodoTemplate_all(self): + tmpl_1 = TodoTemplate(self.db_conn) + tmpl_1.save(self.db_conn) + tmpl_2 = TodoTemplate(self.db_conn) + tmpl_2.save(self.db_conn) + self.assertEqual({tmpl_1.id_, tmpl_2.id_}, set(t.id_ for t in TodoTemplate.all(self.db_conn))) + + def test_versioned_attributes(self): + def test(name, default, values): + def wait_till_next_timestamp(timestamp): + # if we .set() to early, the timestamp key will not have changed, + # i.e. we'd simply over-write the previous value + while Day.todays_date(with_time=True) == timestamp: + sleep(0.0001) + return Day.todays_date(with_time=True) + tmpl = TodoTemplate(self.db_conn) + tmpl.save(self.db_conn) + tmpl_attr = getattr(tmpl, name) + # check we get default value on empty history + self.assertEqual(tmpl_attr.newest, default) + self.assertEqual({}, tmpl_attr.history) + # check we get new value when set + timestamp_1 = Day.todays_date(with_time=True) + tmpl_attr.set(values[0]) + self.assertEqual(tmpl_attr.newest, values[0]) + # check history remains unchanged if setting same value as .newest + timestamp_2 = wait_till_next_timestamp(timestamp_1) + tmpl_attr.set(values[0]) + self.assertEqual(len(tmpl_attr.history), 1) + # check we can access different values with .newest and .at + tmpl_attr.set(values[1]) + self.assertEqual(tmpl_attr.newest, values[1]) + self.assertEqual(tmpl_attr.at(timestamp_1), values[0]) + # check attribute history stored in DB with parent + tmpl.save(self.db_conn) + retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_) + self.assertEqual(getattr(retrieved, name).at(timestamp_1), values[0]) # i.e. attribute.save() works + # check forks can use original's history, but only up to their fork moment + fork = tmpl.fork(self.db_conn) + wait_till_next_timestamp(timestamp_2) + tmpl_attr.set(values[2]) + forked_attr = getattr(fork, name) + self.assertEqual(forked_attr.newest, values[1]) + self.assertEqual(forked_attr.at(timestamp_1), values[0]) + # check original and fork bifurcate their history + forked_attr.set(values[0]) + self.assertEqual(forked_attr.newest, values[0]) + self.assertEqual(tmpl_attr.newest, values[2]) + test('title', 'UNNAMED', ['foo', 'bar', 'baz']) + test('default_effort', 1.0, [0.5, 3, 9]) + test('description', '', ['foo', 'bar', 'baz']) + + + +class TestSansDB(TestCase): + + def test_Day_date_validate(self): + self.assertEqual(None, Day.date_valid('foo')) + self.assertEqual(None, Day.date_valid('2024-02-30')) + self.assertEqual(None, Day.date_valid('2024-01-01 23:59:59')) + self.assertEqual(datetime(2024,1,1), Day.date_valid('2024-01-01')) + + def test_Day_date_classmethods(self): + self.assertEqual(str(datetime.now())[:10], Day.todays_date()) + self.assertEqual(str(datetime.now())[:DATETIME_KEY_RESOLUTION], Day.todays_date(with_time=True)) + self.assertEqual(str(datetime.now() - timedelta(days=1))[:10], Day.yesterdays_date()) + self.assertEqual(str(datetime.now() + timedelta(days=1))[:10], Day.tomorrows_date()) + + def test_Day_init(self): + test_date = '2024-02-29' + test_comment = 'foo' + day = Day(test_date) + self.assertEqual(day.date, test_date) + self.assertEqual(day.comment, '') + day = Day(test_date, test_comment) + self.assertEqual(day.comment, test_comment) + with self.assertRaises(HandledException): + Day('foo') + + def test_Day_date_neighbors(self): + day = Day('2024-02-29') + self.assertEqual(day.prev_date, '2024-02-28') + self.assertEqual(day.next_date, '2024-03-01') + + def test_Day_date_weekday(self): + day = Day('2024-02-29') + self.assertEqual(day.weekday, 'Thursday') + + def test_Day_cmp(self): + day1 = Day('2024-01-01') + day2 = Day('2024-01-02') + day3 = Day('2024-01-03') + days = [day3, day1, day2] + self.assertEqual(sorted(days), [day1, day2, day3]) -- 2.30.2