X-Git-Url: https://plomlompom.com/repos/%7B%7Bdb.prefix%7D%7D/day?a=blobdiff_plain;f=new_todo%2Ftodo.py;h=7a5ed2dbcf7da166439302c2c251733e8031f476;hb=0cd0b92d9e261dbe75fa45001aeca74592c053f8;hp=8b8331cea52593f5beff13bfb168d8b17e3f8a4d;hpb=9637ae6c570e9cf0cce500d85af03c74d6771362;p=misc diff --git a/new_todo/todo.py b/new_todo/todo.py index 8b8331c..7a5ed2d 100755 --- a/new_todo/todo.py +++ b/new_todo/todo.py @@ -3,6 +3,9 @@ from sqlite3 import connect as sql_connect from http.server import BaseHTTPRequestHandler from urllib.parse import parse_qs from datetime import datetime, timedelta +from unittest import TestCase +from os.path import isfile +from time import sleep @@ -13,6 +16,7 @@ HTML_DIR='html' DATE_FORMAT = '%Y-%m-%d' +DATETIME_KEY_RESOLUTION = 24 @@ -29,25 +33,25 @@ class HandledException(Exception): class TodoDBFile: - def __init__(self, path): - from os.path import isfile + def __init__(self, path, force_creation=False): self.path = path if not isfile(self.path): - self.make_new_if_wanted_else_abort() - self.validate_schema() - - def make_new_if_wanted_else_abort(self): - create_question = f'Database file not found: {self.path}. Create? Y/n\n' - msg_on_no = 'Interpreting reply as "no", but cannot run without database file.' - legal_yesses = {'y', 'yes'} - create_reply = input(create_question) - if not create_reply.lower() in legal_yesses: - raise HandledException(msg_on_no) + self._make_new_if_wanted_else_abort(force_creation) + self._validate_schema() + + def _make_new_if_wanted_else_abort(self, force_creation): + if not force_creation: + create_question = f'Database file not found: {self.path}. Create? Y/n\n' + msg_on_no = 'Interpreting reply as "no", but cannot run without database file.' + legal_yesses = {'y', 'yes'} + create_reply = input(create_question) + if not create_reply.lower() in legal_yesses: + raise HandledException(msg_on_no) with sql_connect(self.path) as conn: with open(PATH_DB_SCHEMA, 'r') as f: conn.executescript(f.read()) - def validate_schema(self): + def _validate_schema(self): sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql' msg_wrong_schema = 'Database has wrong tables schema. Diff:\n' with sql_connect(self.path) as conn: @@ -76,6 +80,7 @@ class TodoDBConnection: def __init__(self, db_file): self.file = db_file self.conn = sql_connect(self.file.path) + self.conn.execute('PRAGMA foreign_keys = ON') def commit(self): self.file.backup() @@ -113,20 +118,39 @@ class VersionedAttribute: db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)', (self.parent.id_, date, value)) + def delete(self, db_conn): + db_conn.exec(f'DELETE FROM {self.table_name} WHERE template = ?', (self.parent.id_,)) + @property - def newest_date(self): + def _newest_datetime(self): return sorted(self.history.keys())[-1] + def _forked_value_at(self, date_with_time): + return getattr(self.parent.forked_from, self.name).at(date_with_time) + @property def newest(self): if 0 == len(self.history): if self.parent.forked_from: - return getattr(self.parent.forked_from, self.name).newest + return self._forked_value_at(self.parent.forked_at) return self.default - return self.history[self.newest_date] + return self.history[self._newest_datetime] + + def at(self, date_with_time): + sorted_datetimes = sorted(self.history.keys()) + if self.parent.forked_from and (0 == len(sorted_datetimes) or sorted_datetimes[0] > date_with_time): + return self._forked_value_at(date_with_time) + elif 0 == len(sorted_datetimes): + return self.default + ret = self.history[sorted_datetimes[0]] + for k, v in self.history.items(): + if k > date_with_time: + break + ret = v + return ret def set(self, value): - if 0 == len(self.history) or value != self.history[self.newest_date]: + if 0 == len(self.history) or value != self.history[self._newest_datetime]: self.history[Day.todays_date(with_time=True)] = value @@ -193,13 +217,13 @@ class Day: def date_valid(cls, date): try: result = datetime.strptime(date, DATE_FORMAT) - except ValueError: + except (ValueError, TypeError): return None return result @classmethod def todays_date(cls, with_time=False): - cut_length = 19 if with_time else 10 + cut_length = DATETIME_KEY_RESOLUTION if with_time else 10 return str(datetime.now())[:cut_length] @classmethod @@ -234,16 +258,17 @@ class Day: class TodoTemplate: - def __init__(self, db_conn, id_): + def __init__(self, db_conn, id_=None, forked_from=None, forked_at=None): self.id_ = id_ - self.forked_from = None + self.forked_from = TodoTemplate.by_id(db_conn, forked_from) + self.forked_at = forked_at self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED') self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0) self.description = VersionedAttribute(db_conn, self, 'description', '') @classmethod def from_row(cls, db_conn, row): - return cls(db_conn, row[0]) + return cls(db_conn, row[0], row[1], row[2]) @classmethod def all(cls, db_conn): @@ -257,26 +282,41 @@ class TodoTemplate: for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)): return cls.from_row(db_conn, row) if make_if_none: - return cls(db_conn, id_) + return cls(db_conn, id_, None, None) return None def save(self, db_conn): - cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,)) + cursor = db_conn.exec('REPLACE INTO templates VALUES (?,?,?) RETURNING ID', + (self.id_, self.forked_from.id_ if self.forked_from else None, self.forked_at)) if self.id_ is None: self.id_ = cursor.fetchone()[0] self.title.save(db_conn) self.default_effort.save(db_conn) self.description.save(db_conn) + def delete(self, db_conn): + for row in db_conn.exec('SELECT * FROM templates WHERE forked_from = ?', (self.id_,)): + raise HandledException('cannot delete forking reference') + self.title.delete(db_conn) + self.default_effort.delete(db_conn) + self.description.delete(db_conn) + db_conn.exec('DELETE FROM templates WHERE id = ?', (self.id_,)) + + def fork(self, db_conn): + forked = self.__class__(db_conn, id_=None, forked_from=self.id_, + forked_at=Day.todays_date(with_time=True)) + forked.save(db_conn) + return forked + ######## web stuff ############ class ParamsParser: - def __init__(self, url_query, site_cookie): + def __init__(self, url_query, site_cookie_dict): self.params = parse_qs(url_query, keep_blank_values=True) - self.cookie = site_cookie + self.cookie = site_cookie_dict def get(self, key, default): return self.params.get(key, [default])[0] @@ -367,10 +407,15 @@ class TodoHandler(BaseHTTPRequestHandler): def do_POST_template(self): id_ = self.params.get('id', None) tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True) - tmpl.title.set(self.postvars['title'][0]) - tmpl.default_effort.set(float(self.postvars['default_effort'][0])) - tmpl.description.set(self.postvars['description'][0]) - tmpl.save(self.db_conn) + if 'update' in self.postvars.keys(): + tmpl.title.set(self.postvars['title'][0]) + tmpl.default_effort.set(float(self.postvars['default_effort'][0])) + tmpl.description.set(self.postvars['description'][0]) + tmpl.save(self.db_conn) + elif 'fork' in self.postvars.keys(): + tmpl.fork(self.db_conn) + elif 'delete' in self.postvars.keys(): + tmpl.delete(self.db_conn) self.redir_url = 'templates' # GET routes @@ -415,6 +460,8 @@ class TodoHandler(BaseHTTPRequestHandler): +# main loop + def main(): from http.server import HTTPServer from os import environ @@ -440,3 +487,224 @@ if __name__ == '__main__': main() except HandledException as e: e.nice_exit() + + + +# testing – run with: python3 -m unittest todo.py + +class TestWithDB(TestCase): + + def setUp(self): + self.db_file = TodoDBFile(f'test_db:{datetime.now().timestamp()}', force_creation=True) + self.db_conn = TodoDBConnection(self.db_file) + self.bak_0_path = f'{self.db_file.path}.bak.0' + + def tearDown(self): + from os import remove + remove(self.db_file.path) + for i in range(0, 9): + bak_path = f'{self.db_file.path}.bak.{i}' + if isfile(f'{self.db_file.path}.bak.{i}'): + remove(bak_path) + + def test_backup_bak_0_file_content(self): + day = Day('2024-01-01') + day.save(self.db_conn) + self.db_conn.commit() # backups before writing, so expect different file contents + with open(self.db_file.path, 'rb') as f1: + original_content = f1.read() + with open(self.bak_0_path, 'rb') as f2: + backup_content = f2.read() + self.assertNotEqual(original_content, backup_content) + self.db_conn.commit() # this time commit without changes, so expect equal file contents + with open(self.db_file.path, 'rb') as f1: + original_content = f1.read() + with open(self.bak_0_path, 'rb') as f2: + backup_content = f2.read() + self.assertEqual(original_content, backup_content) + + def test_backup_bak_0_file_attributes(self): + from os import stat + sleep(0.1) # so mtime would change if not copied + self.db_conn.commit() + original_stat = stat(self.db_file.path) + backup_stat = stat(self.bak_0_path) + for name in {'st_mode', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime'}: + self.assertEqual(getattr(original_stat, name), getattr(backup_stat, name)) + + def test_Day_by_date(self): + test_date_1 = '2024-02-28' + test_date_2 = '2024-02-29' + test_comment = 'foo' + day1 = Day(test_date_1, test_comment) + day1.save(self.db_conn) + retrieved_day = Day.by_date(self.db_conn, test_date_1) + self.assertEqual(day1, retrieved_day) + self.assertEqual(day1.comment, retrieved_day.comment) + self.assertEqual(None, Day.by_date(self.db_conn, test_date_2)) + self.assertEqual(Day(test_date_2), Day.by_date(self.db_conn, test_date_2, make_if_none=True)) + + def test_Day_all(self): + with self.assertRaises(HandledException): + Day.all(self.db_conn, date_range=(None, None)) + Day.all(self.db_conn, date_range=('foo', '')) + Day.all(self.db_conn, date_range=('', '2024-02-30')) + test_date_1 = str(datetime.now() - timedelta(days=2))[:10] + test_date_2 = Day.todays_date() + test_date_3 = str(datetime.now() + timedelta(days=2))[:10] + day1 = Day(test_date_1) + day2 = Day(test_date_2) + day3 = Day(test_date_3) + day1.save(self.db_conn) + day2.save(self.db_conn) + day3.save(self.db_conn) + self.assertEqual([day1, day2, day3], Day.all(self.db_conn)) + self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=('yesterday', ''))) + self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=(Day.yesterdays_date(), ''))) + self.assertEqual([day3], Day.all(self.db_conn, date_range=('tomorrow', ''))) + self.assertEqual([day3], Day.all(self.db_conn, date_range=(Day.tomorrows_date(), ''))) + self.assertEqual([day2], Day.all(self.db_conn, date_range=('today', Day.todays_date()))) + self.assertEqual([day1], Day.all(self.db_conn, date_range=('', 'yesterday'))) + self.assertEqual([Day(Day.yesterdays_date()), day2], + Day.all(self.db_conn, ensure_betweens=True, date_range=('yesterday', 'today'))) + self.assertEqual([day2, Day(Day.tomorrows_date())], + Day.all(self.db_conn, ensure_betweens=True, date_range=('today', 'tomorrow'))) + + def test_TodoTemplate_by_id(self): + tmpl = TodoTemplate(self.db_conn) + tmpl.save(self.db_conn) + retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_) + self.assertEqual(tmpl.id_, retrieved.id_) + self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1)) + self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1, make_if_none=True), TodoTemplate) + + def test_TodoTemplate_all(self): + tmpl_1 = TodoTemplate(self.db_conn) + tmpl_1.save(self.db_conn) + tmpl_2 = TodoTemplate(self.db_conn) + tmpl_2.save(self.db_conn) + self.assertEqual({tmpl_1.id_, tmpl_2.id_}, set(t.id_ for t in TodoTemplate.all(self.db_conn))) + + def test_TodoTemplate_delete(self): + tmpl = TodoTemplate(self.db_conn) + tmpl.title.set('foo') + tmpl.save(self.db_conn) + self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_), TodoTemplate) + tmpl.delete(self.db_conn) + self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_)) + tmpl = TodoTemplate(self.db_conn) + tmpl.save(self.db_conn) + fork = tmpl.fork(self.db_conn) + with self.assertRaises(HandledException): + tmpl.delete(self.db_conn) + self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_), TodoTemplate) + fork.delete(self.db_conn) + tmpl.delete(self.db_conn) + self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_)) + + def test_versioned_attributes(self): + def test(name, default, values): + def wait_till_next_timestamp(timestamp): + # if we .set() to early, the timestamp key will not have changed, + # i.e. we'd simply over-write the previous value + while Day.todays_date(with_time=True) == timestamp: + sleep(0.0001) + return Day.todays_date(with_time=True) + tmpl = TodoTemplate(self.db_conn) + tmpl.save(self.db_conn) + tmpl_attr = getattr(tmpl, name) + # check we get default value on empty history + self.assertEqual(tmpl_attr.newest, default) + self.assertEqual({}, tmpl_attr.history) + # check we get new value when set + tmpl_attr.set(values[0]) + timestamp_1 = Day.todays_date(with_time=True) + self.assertEqual(tmpl_attr.newest, values[0]) + # check history remains unchanged if setting same value as .newest + timestamp_2 = wait_till_next_timestamp(timestamp_1) + tmpl_attr.set(values[0]) + self.assertEqual(len(tmpl_attr.history), 1) + # check we can access different values with .newest and .at + tmpl_attr.set(values[1]) + self.assertEqual(tmpl_attr.newest, values[1]) + self.assertEqual(tmpl_attr.at(timestamp_1), values[0]) + # check attribute history stored in DB with parent + tmpl.save(self.db_conn) + retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_) + self.assertEqual(getattr(retrieved, name).at(timestamp_1), values[0]) # i.e. attribute.save() works + # check forks can use original's history, but only up to their fork moment + fork = tmpl.fork(self.db_conn) + wait_till_next_timestamp(timestamp_2) + tmpl_attr.set(values[2]) + forked_attr = getattr(fork, name) + self.assertEqual(forked_attr.newest, values[1]) + self.assertEqual(forked_attr.at(timestamp_1), values[0]) + # check original and fork bifurcate their history + forked_attr.set(values[0]) + self.assertEqual(forked_attr.newest, values[0]) + self.assertEqual(tmpl_attr.newest, values[2]) + test('title', 'UNNAMED', ['foo', 'bar', 'baz']) + test('default_effort', 1.0, [0.5, 3, 9]) + test('description', '', ['foo', 'bar', 'baz']) + + + +class TestSansDB(TestCase): + + def test_Day_date_validate(self): + self.assertEqual(None, Day.date_valid('foo')) + self.assertEqual(None, Day.date_valid('2024-02-30')) + self.assertEqual(None, Day.date_valid('2024-01-01 23:59:59')) + self.assertEqual(datetime(2024,1,1), Day.date_valid('2024-01-01')) + + def test_Day_date_classmethods(self): + self.assertEqual(str(datetime.now())[:10], Day.todays_date()) + self.assertEqual(str(datetime.now())[:DATETIME_KEY_RESOLUTION], Day.todays_date(with_time=True)) + self.assertEqual(str(datetime.now() - timedelta(days=1))[:10], Day.yesterdays_date()) + self.assertEqual(str(datetime.now() + timedelta(days=1))[:10], Day.tomorrows_date()) + + def test_Day_init(self): + test_date = '2024-02-29' + test_comment = 'foo' + day = Day(test_date) + self.assertEqual(day.date, test_date) + self.assertEqual(day.comment, '') + day = Day(test_date, test_comment) + self.assertEqual(day.comment, test_comment) + with self.assertRaises(HandledException): + Day('foo') + + def test_Day_date_neighbors(self): + day = Day('2024-02-29') + self.assertEqual(day.prev_date, '2024-02-28') + self.assertEqual(day.next_date, '2024-03-01') + + def test_Day_date_weekday(self): + day = Day('2024-02-29') + self.assertEqual(day.weekday, 'Thursday') + + def test_Day_cmp(self): + day1 = Day('2024-01-01') + day2 = Day('2024-01-02') + day3 = Day('2024-01-03') + days = [day3, day1, day2] + self.assertEqual(sorted(days), [day1, day2, day3]) + + def test_ParamsParser(self): + from urllib.parse import urlencode + q = urlencode({'is_foo': 'foo', 'is_whitespace': '', 'is_None': None}) + c = {'is_bar': 'bar'} + p = ParamsParser(q, c) + # check basic retrieval and default behavior + self.assertEqual(p.get('is_foo', None), 'foo') + self.assertEqual(p.get('missing', None), None) + self.assertEqual(p.get('missing', 'default'), 'default') + # check handling of empty and None values + self.assertEqual(p.get('missing', ''), '') + self.assertEqual(p.get('is_whitespace', None), '') + self.assertEqual(p.get('is_None', None), 'None') # TODO: unwanted behavior, or urlencode fault? + # check retrieval and setting of cookied values + self.assertEqual(p.get_cookied('missing', None), None) + self.assertEqual(c['missing'], None) + self.assertEqual(p.get_cookied('missing', 'default'), None) + self.assertEqual(p.get_cookied('is_bar', None), 'bar')