home · contact · privacy
Add forking and testing to todo.py rewrite.
authorChristian Heller <c.heller@plomlompom.de>
Fri, 15 Mar 2024 02:10:13 +0000 (03:10 +0100)
committerChristian Heller <c.heller@plomlompom.de>
Fri, 15 Mar 2024 02:10:13 +0000 (03:10 +0100)
new_todo/init.sql
new_todo/todo.py

index 49b52d80d34f7ab6139e102d6df243fe3d8c24cc..1be14229d48dfa52ae289a9e086d6ca0606fb59f 100644 (file)
@@ -3,7 +3,9 @@ CREATE TABLE days (
   comment TEXT NOT NULL
 );
 CREATE TABLE templates (
-  id INTEGER PRIMARY KEY
+  id INTEGER PRIMARY KEY,
+  forked_from INTEGER,
+  forked_at TEXT
 );
 CREATE TABLE versioned_default_efforts (
   template INTEGER NOT NULL,
index 8b8331cea52593f5beff13bfb168d8b17e3f8a4d..b7ee52b56d742ba93f754d022d80ee0dda12aaa5 100755 (executable)
@@ -3,6 +3,9 @@ from sqlite3 import connect as sql_connect
 from http.server import BaseHTTPRequestHandler
 from urllib.parse import parse_qs
 from datetime import datetime, timedelta
+from unittest import TestCase
+from os.path import isfile
+from time import sleep
 
 
 
@@ -13,6 +16,7 @@ HTML_DIR='html'
 
 
 DATE_FORMAT = '%Y-%m-%d'
+DATETIME_KEY_RESOLUTION = 24
 
 
 
@@ -29,25 +33,25 @@ class HandledException(Exception):
 
 class TodoDBFile:
 
-    def __init__(self, path):
-        from os.path import isfile
+    def __init__(self, path, force_creation=False):
         self.path = path
         if not isfile(self.path):
-            self.make_new_if_wanted_else_abort()
-        self.validate_schema()
-
-    def make_new_if_wanted_else_abort(self):
-        create_question = f'Database file not found: {self.path}. Create? Y/n\n'
-        msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
-        legal_yesses = {'y', 'yes'}
-        create_reply = input(create_question)
-        if not create_reply.lower() in legal_yesses: 
-            raise HandledException(msg_on_no)
+            self._make_new_if_wanted_else_abort(force_creation)
+        self._validate_schema()
+
+    def _make_new_if_wanted_else_abort(self, force_creation):
+        if not force_creation:
+            create_question = f'Database file not found: {self.path}. Create? Y/n\n'
+            msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
+            legal_yesses = {'y', 'yes'}
+            create_reply = input(create_question)
+            if not create_reply.lower() in legal_yesses: 
+                raise HandledException(msg_on_no)
         with sql_connect(self.path) as conn:
             with open(PATH_DB_SCHEMA, 'r') as f:
                 conn.executescript(f.read())
 
-    def validate_schema(self):
+    def _validate_schema(self):
         sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
         msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
         with sql_connect(self.path) as conn:
@@ -114,19 +118,35 @@ class VersionedAttribute:
                          (self.parent.id_, date, value))
 
     @property
-    def newest_date(self):
+    def _newest_datetime(self):
         return sorted(self.history.keys())[-1]
 
+    def _forked_value_at(self, date_with_time):
+        return getattr(self.parent.forked_from, self.name).at(date_with_time)
+
     @property
     def newest(self):
         if 0 == len(self.history):
             if self.parent.forked_from:
-                return getattr(self.parent.forked_from, self.name).newest
+                return self._forked_value_at(self.parent.forked_at)
+            return self.default
+        return self.history[self._newest_datetime]
+
+    def at(self, date_with_time):
+        sorted_datetimes = sorted(self.history.keys())
+        if self.parent.forked_from and (0 == len(sorted_datetimes) or sorted_datetimes[0] > date_with_time):
+            return self._forked_value_at(date_with_time)
+        elif 0 == len(sorted_datetimes):
             return self.default
-        return self.history[self.newest_date]
+        ret = self.history[sorted_datetimes[0]]
+        for k, v in self.history.items():
+            if k > date_with_time:
+                break
+            ret = v
+        return ret
 
     def set(self, value):
-        if 0 == len(self.history) or value != self.history[self.newest_date]:
+        if 0 == len(self.history) or value != self.history[self._newest_datetime]:
             self.history[Day.todays_date(with_time=True)] = value
 
 
@@ -193,13 +213,13 @@ class Day:
     def date_valid(cls, date):
         try:
             result = datetime.strptime(date, DATE_FORMAT)
-        except ValueError:
+        except (ValueError, TypeError):
             return None 
         return result 
 
     @classmethod
     def todays_date(cls, with_time=False):
-        cut_length = 19 if with_time else 10
+        cut_length = DATETIME_KEY_RESOLUTION if with_time else 10
         return str(datetime.now())[:cut_length]
 
     @classmethod
@@ -234,16 +254,17 @@ class Day:
 
 class TodoTemplate:
 
-    def __init__(self, db_conn, id_):
+    def __init__(self, db_conn, id_=None, forked_from=None, forked_at=None):
         self.id_ = id_ 
-        self.forked_from = None
+        self.forked_from = TodoTemplate.by_id(db_conn, forked_from) 
+        self.forked_at = forked_at 
         self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED') 
         self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0) 
         self.description = VersionedAttribute(db_conn, self, 'description', '') 
 
     @classmethod
     def from_row(cls, db_conn, row):
-        return cls(db_conn, row[0])
+        return cls(db_conn, row[0], row[1], row[2])
 
     @classmethod
     def all(cls, db_conn):
@@ -257,17 +278,21 @@ class TodoTemplate:
         for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
             return cls.from_row(db_conn, row)
         if make_if_none:
-            return cls(db_conn, id_
+            return cls(db_conn, id_, None, None)
         return None
 
     def save(self, db_conn):
-        cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,))
+        cursor = db_conn.exec('REPLACE INTO templates VALUES (?,?,?) RETURNING ID',
+                              (self.id_, self.forked_from.id_ if self.forked_from else None, self.forked_at))
         if self.id_ is None:
             self.id_ = cursor.fetchone()[0]
         self.title.save(db_conn)
         self.default_effort.save(db_conn)
         self.description.save(db_conn)
 
+    def fork(self, db_conn):
+        return self.__class__(db_conn, id_=None, forked_from=self.id_, forked_at=Day.todays_date(with_time=True))
+
 
 
 ######## web stuff ############
@@ -415,6 +440,8 @@ class TodoHandler(BaseHTTPRequestHandler):
 
 
 
+# main loop
+
 def main():
     from http.server import HTTPServer
     from os import environ
@@ -440,3 +467,188 @@ if __name__ == '__main__':
         main()
     except HandledException as e:
         e.nice_exit()
+
+
+
+# testing – run with: python3 -m unittest todo.py
+
+class TestWithDB(TestCase):
+
+    def setUp(self):
+        self.db_file = TodoDBFile(f'test_db:{datetime.now().timestamp()}', force_creation=True)
+        self.db_conn = TodoDBConnection(self.db_file)
+        self.bak_0_path = f'{self.db_file.path}.bak.0'
+
+    def tearDown(self):
+        from os import remove
+        remove(self.db_file.path)
+        for i in range(0, 9):
+            bak_path = f'{self.db_file.path}.bak.{i}'
+            if isfile(f'{self.db_file.path}.bak.{i}'):
+                remove(bak_path)
+
+    def test_backup_bak_0_file_content(self):
+        day = Day('2024-01-01')
+        day.save(self.db_conn)
+        self.db_conn.commit()  # backups before writing, so expect different file contents
+        with open(self.db_file.path, 'rb') as f1:
+            original_content = f1.read()
+            with open(self.bak_0_path, 'rb') as f2:
+                backup_content = f2.read()
+        self.assertNotEqual(original_content, backup_content)
+        self.db_conn.commit()  # this time commit without changes, so expect equal file contents
+        with open(self.db_file.path, 'rb') as f1:
+            original_content = f1.read()
+            with open(self.bak_0_path, 'rb') as f2:
+                backup_content = f2.read()
+        self.assertEqual(original_content, backup_content)
+
+    def test_backup_bak_0_file_attributes(self):
+        from os import stat
+        sleep(0.1)  # so mtime would change if not copied
+        self.db_conn.commit()
+        original_stat = stat(self.db_file.path)
+        backup_stat = stat(self.bak_0_path)
+        for name in {'st_mode', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime'}:
+            self.assertEqual(getattr(original_stat, name), getattr(backup_stat, name))
+
+    def test_Day_by_date(self):
+        test_date_1 = '2024-02-28' 
+        test_date_2 = '2024-02-29' 
+        test_comment = 'foo'
+        day1 = Day(test_date_1, test_comment)
+        day1.save(self.db_conn)
+        retrieved_day = Day.by_date(self.db_conn, test_date_1)
+        self.assertEqual(day1, retrieved_day)
+        self.assertEqual(day1.comment, retrieved_day.comment)
+        self.assertEqual(None, Day.by_date(self.db_conn, test_date_2))
+        self.assertEqual(Day(test_date_2), Day.by_date(self.db_conn, test_date_2, make_if_none=True))
+
+    def test_Day_all(self):
+        with self.assertRaises(HandledException):
+            Day.all(self.db_conn, date_range=(None, None))
+            Day.all(self.db_conn, date_range=('foo', ''))
+            Day.all(self.db_conn, date_range=('', '2024-02-30'))
+        test_date_1 = str(datetime.now() - timedelta(days=2))[:10]
+        test_date_2 = Day.todays_date() 
+        test_date_3 = str(datetime.now() + timedelta(days=2))[:10]
+        day1 = Day(test_date_1)
+        day2 = Day(test_date_2)
+        day3 = Day(test_date_3)
+        day1.save(self.db_conn)
+        day2.save(self.db_conn)
+        day3.save(self.db_conn)
+        self.assertEqual([day1, day2, day3], Day.all(self.db_conn))
+        self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=('yesterday', '')))
+        self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=(Day.yesterdays_date(), '')))
+        self.assertEqual([day3], Day.all(self.db_conn, date_range=('tomorrow', '')))
+        self.assertEqual([day3], Day.all(self.db_conn, date_range=(Day.tomorrows_date(), '')))
+        self.assertEqual([day2], Day.all(self.db_conn, date_range=('today', Day.todays_date())))
+        self.assertEqual([day1], Day.all(self.db_conn, date_range=('', 'yesterday')))
+        self.assertEqual([Day(Day.yesterdays_date()), day2],
+                         Day.all(self.db_conn, ensure_betweens=True, date_range=('yesterday', 'today')))
+        self.assertEqual([day2, Day(Day.tomorrows_date())],
+                         Day.all(self.db_conn, ensure_betweens=True, date_range=('today', 'tomorrow')))
+
+    def test_TodoTemplate_by_id(self):
+        tmpl = TodoTemplate(self.db_conn)
+        tmpl.save(self.db_conn)
+        retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
+        self.assertEqual(tmpl.id_, retrieved.id_)
+        self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1))
+        self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1, make_if_none=True), TodoTemplate)
+
+    def test_TodoTemplate_all(self):
+        tmpl_1 = TodoTemplate(self.db_conn)
+        tmpl_1.save(self.db_conn)
+        tmpl_2 = TodoTemplate(self.db_conn)
+        tmpl_2.save(self.db_conn)
+        self.assertEqual({tmpl_1.id_, tmpl_2.id_}, set(t.id_ for t in TodoTemplate.all(self.db_conn)))
+
+    def test_versioned_attributes(self):
+        def test(name, default, values):
+            def wait_till_next_timestamp(timestamp):
+                # if we .set() to early, the timestamp key will not have changed,
+                # i.e. we'd simply over-write the previous value 
+                while Day.todays_date(with_time=True) == timestamp:
+                    sleep(0.0001)
+                return Day.todays_date(with_time=True) 
+            tmpl = TodoTemplate(self.db_conn)
+            tmpl.save(self.db_conn)
+            tmpl_attr = getattr(tmpl, name)
+            # check we get default value on empty history
+            self.assertEqual(tmpl_attr.newest, default)
+            self.assertEqual({}, tmpl_attr.history)
+            # check we get new value when set 
+            timestamp_1 = Day.todays_date(with_time=True)
+            tmpl_attr.set(values[0])
+            self.assertEqual(tmpl_attr.newest, values[0])
+            # check history remains unchanged if setting same value as .newest 
+            timestamp_2 = wait_till_next_timestamp(timestamp_1)
+            tmpl_attr.set(values[0])
+            self.assertEqual(len(tmpl_attr.history), 1)
+            # check we can access different values with .newest and .at 
+            tmpl_attr.set(values[1])
+            self.assertEqual(tmpl_attr.newest, values[1])
+            self.assertEqual(tmpl_attr.at(timestamp_1), values[0])
+            # check attribute history stored in DB with parent
+            tmpl.save(self.db_conn)
+            retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
+            self.assertEqual(getattr(retrieved, name).at(timestamp_1), values[0])  # i.e. attribute.save() works
+            # check forks can use original's history, but only up to their fork moment
+            fork = tmpl.fork(self.db_conn)
+            wait_till_next_timestamp(timestamp_2)
+            tmpl_attr.set(values[2])
+            forked_attr = getattr(fork, name)
+            self.assertEqual(forked_attr.newest, values[1])
+            self.assertEqual(forked_attr.at(timestamp_1), values[0])
+            # check original and fork bifurcate their history 
+            forked_attr.set(values[0])
+            self.assertEqual(forked_attr.newest, values[0])
+            self.assertEqual(tmpl_attr.newest, values[2])
+        test('title', 'UNNAMED', ['foo', 'bar', 'baz'])
+        test('default_effort', 1.0, [0.5, 3, 9])
+        test('description', '', ['foo', 'bar', 'baz'])
+
+
+
+class TestSansDB(TestCase):
+
+    def test_Day_date_validate(self):
+        self.assertEqual(None, Day.date_valid('foo'))
+        self.assertEqual(None, Day.date_valid('2024-02-30'))
+        self.assertEqual(None, Day.date_valid('2024-01-01 23:59:59'))
+        self.assertEqual(datetime(2024,1,1), Day.date_valid('2024-01-01'))
+
+    def test_Day_date_classmethods(self):
+        self.assertEqual(str(datetime.now())[:10], Day.todays_date())
+        self.assertEqual(str(datetime.now())[:DATETIME_KEY_RESOLUTION], Day.todays_date(with_time=True))
+        self.assertEqual(str(datetime.now() - timedelta(days=1))[:10], Day.yesterdays_date())
+        self.assertEqual(str(datetime.now() + timedelta(days=1))[:10], Day.tomorrows_date())
+
+    def test_Day_init(self):
+        test_date = '2024-02-29'
+        test_comment = 'foo'
+        day = Day(test_date)
+        self.assertEqual(day.date, test_date)
+        self.assertEqual(day.comment, '')
+        day = Day(test_date, test_comment)
+        self.assertEqual(day.comment, test_comment)
+        with self.assertRaises(HandledException):
+            Day('foo')
+
+    def test_Day_date_neighbors(self):
+        day = Day('2024-02-29')
+        self.assertEqual(day.prev_date, '2024-02-28')
+        self.assertEqual(day.next_date, '2024-03-01')
+
+    def test_Day_date_weekday(self):
+        day = Day('2024-02-29')
+        self.assertEqual(day.weekday, 'Thursday')
+
+    def test_Day_cmp(self):
+        day1 = Day('2024-01-01')
+        day2 = Day('2024-01-02')
+        day3 = Day('2024-01-03')
+        days = [day3, day1, day2]
+        self.assertEqual(sorted(days), [day1, day2, day3])