from http.server import BaseHTTPRequestHandler
from urllib.parse import parse_qs
from datetime import datetime, timedelta
+from unittest import TestCase
+from os.path import isfile
+from time import sleep
DATE_FORMAT = '%Y-%m-%d'
+DATETIME_KEY_RESOLUTION = 24
class TodoDBFile:
- def __init__(self, path):
- from os.path import isfile
+ def __init__(self, path, force_creation=False):
self.path = path
if not isfile(self.path):
- self.make_new_if_wanted_else_abort()
- self.validate_schema()
-
- def make_new_if_wanted_else_abort(self):
- create_question = f'Database file not found: {self.path}. Create? Y/n\n'
- msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
- legal_yesses = {'y', 'yes'}
- create_reply = input(create_question)
- if not create_reply.lower() in legal_yesses:
- raise HandledException(msg_on_no)
+ self._make_new_if_wanted_else_abort(force_creation)
+ self._validate_schema()
+
+ def _make_new_if_wanted_else_abort(self, force_creation):
+ if not force_creation:
+ create_question = f'Database file not found: {self.path}. Create? Y/n\n'
+ msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
+ legal_yesses = {'y', 'yes'}
+ create_reply = input(create_question)
+ if not create_reply.lower() in legal_yesses:
+ raise HandledException(msg_on_no)
with sql_connect(self.path) as conn:
with open(PATH_DB_SCHEMA, 'r') as f:
conn.executescript(f.read())
- def validate_schema(self):
+ def _validate_schema(self):
sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
with sql_connect(self.path) as conn:
(self.parent.id_, date, value))
@property
- def newest_date(self):
+ def _newest_datetime(self):
return sorted(self.history.keys())[-1]
+ def _forked_value_at(self, date_with_time):
+ return getattr(self.parent.forked_from, self.name).at(date_with_time)
+
@property
def newest(self):
if 0 == len(self.history):
if self.parent.forked_from:
- return getattr(self.parent.forked_from, self.name).newest
+ return self._forked_value_at(self.parent.forked_at)
+ return self.default
+ return self.history[self._newest_datetime]
+
+ def at(self, date_with_time):
+ sorted_datetimes = sorted(self.history.keys())
+ if self.parent.forked_from and (0 == len(sorted_datetimes) or sorted_datetimes[0] > date_with_time):
+ return self._forked_value_at(date_with_time)
+ elif 0 == len(sorted_datetimes):
return self.default
- return self.history[self.newest_date]
+ ret = self.history[sorted_datetimes[0]]
+ for k, v in self.history.items():
+ if k > date_with_time:
+ break
+ ret = v
+ return ret
def set(self, value):
- if 0 == len(self.history) or value != self.history[self.newest_date]:
+ if 0 == len(self.history) or value != self.history[self._newest_datetime]:
self.history[Day.todays_date(with_time=True)] = value
def date_valid(cls, date):
try:
result = datetime.strptime(date, DATE_FORMAT)
- except ValueError:
+ except (ValueError, TypeError):
return None
return result
@classmethod
def todays_date(cls, with_time=False):
- cut_length = 19 if with_time else 10
+ cut_length = DATETIME_KEY_RESOLUTION if with_time else 10
return str(datetime.now())[:cut_length]
@classmethod
class TodoTemplate:
- def __init__(self, db_conn, id_):
+ def __init__(self, db_conn, id_=None, forked_from=None, forked_at=None):
self.id_ = id_
- self.forked_from = None
+ self.forked_from = TodoTemplate.by_id(db_conn, forked_from)
+ self.forked_at = forked_at
self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED')
self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0)
self.description = VersionedAttribute(db_conn, self, 'description', '')
@classmethod
def from_row(cls, db_conn, row):
- return cls(db_conn, row[0])
+ return cls(db_conn, row[0], row[1], row[2])
@classmethod
def all(cls, db_conn):
for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
return cls.from_row(db_conn, row)
if make_if_none:
- return cls(db_conn, id_)
+ return cls(db_conn, id_, None, None)
return None
def save(self, db_conn):
- cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,))
+ cursor = db_conn.exec('REPLACE INTO templates VALUES (?,?,?) RETURNING ID',
+ (self.id_, self.forked_from.id_ if self.forked_from else None, self.forked_at))
if self.id_ is None:
self.id_ = cursor.fetchone()[0]
self.title.save(db_conn)
self.default_effort.save(db_conn)
self.description.save(db_conn)
+ def fork(self, db_conn):
+ return self.__class__(db_conn, id_=None, forked_from=self.id_, forked_at=Day.todays_date(with_time=True))
+
######## web stuff ############
+# main loop
+
def main():
from http.server import HTTPServer
from os import environ
main()
except HandledException as e:
e.nice_exit()
+
+
+
+# testing – run with: python3 -m unittest todo.py
+
+class TestWithDB(TestCase):
+
+ def setUp(self):
+ self.db_file = TodoDBFile(f'test_db:{datetime.now().timestamp()}', force_creation=True)
+ self.db_conn = TodoDBConnection(self.db_file)
+ self.bak_0_path = f'{self.db_file.path}.bak.0'
+
+ def tearDown(self):
+ from os import remove
+ remove(self.db_file.path)
+ for i in range(0, 9):
+ bak_path = f'{self.db_file.path}.bak.{i}'
+ if isfile(f'{self.db_file.path}.bak.{i}'):
+ remove(bak_path)
+
+ def test_backup_bak_0_file_content(self):
+ day = Day('2024-01-01')
+ day.save(self.db_conn)
+ self.db_conn.commit() # backups before writing, so expect different file contents
+ with open(self.db_file.path, 'rb') as f1:
+ original_content = f1.read()
+ with open(self.bak_0_path, 'rb') as f2:
+ backup_content = f2.read()
+ self.assertNotEqual(original_content, backup_content)
+ self.db_conn.commit() # this time commit without changes, so expect equal file contents
+ with open(self.db_file.path, 'rb') as f1:
+ original_content = f1.read()
+ with open(self.bak_0_path, 'rb') as f2:
+ backup_content = f2.read()
+ self.assertEqual(original_content, backup_content)
+
+ def test_backup_bak_0_file_attributes(self):
+ from os import stat
+ sleep(0.1) # so mtime would change if not copied
+ self.db_conn.commit()
+ original_stat = stat(self.db_file.path)
+ backup_stat = stat(self.bak_0_path)
+ for name in {'st_mode', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime'}:
+ self.assertEqual(getattr(original_stat, name), getattr(backup_stat, name))
+
+ def test_Day_by_date(self):
+ test_date_1 = '2024-02-28'
+ test_date_2 = '2024-02-29'
+ test_comment = 'foo'
+ day1 = Day(test_date_1, test_comment)
+ day1.save(self.db_conn)
+ retrieved_day = Day.by_date(self.db_conn, test_date_1)
+ self.assertEqual(day1, retrieved_day)
+ self.assertEqual(day1.comment, retrieved_day.comment)
+ self.assertEqual(None, Day.by_date(self.db_conn, test_date_2))
+ self.assertEqual(Day(test_date_2), Day.by_date(self.db_conn, test_date_2, make_if_none=True))
+
+ def test_Day_all(self):
+ with self.assertRaises(HandledException):
+ Day.all(self.db_conn, date_range=(None, None))
+ Day.all(self.db_conn, date_range=('foo', ''))
+ Day.all(self.db_conn, date_range=('', '2024-02-30'))
+ test_date_1 = str(datetime.now() - timedelta(days=2))[:10]
+ test_date_2 = Day.todays_date()
+ test_date_3 = str(datetime.now() + timedelta(days=2))[:10]
+ day1 = Day(test_date_1)
+ day2 = Day(test_date_2)
+ day3 = Day(test_date_3)
+ day1.save(self.db_conn)
+ day2.save(self.db_conn)
+ day3.save(self.db_conn)
+ self.assertEqual([day1, day2, day3], Day.all(self.db_conn))
+ self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=('yesterday', '')))
+ self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=(Day.yesterdays_date(), '')))
+ self.assertEqual([day3], Day.all(self.db_conn, date_range=('tomorrow', '')))
+ self.assertEqual([day3], Day.all(self.db_conn, date_range=(Day.tomorrows_date(), '')))
+ self.assertEqual([day2], Day.all(self.db_conn, date_range=('today', Day.todays_date())))
+ self.assertEqual([day1], Day.all(self.db_conn, date_range=('', 'yesterday')))
+ self.assertEqual([Day(Day.yesterdays_date()), day2],
+ Day.all(self.db_conn, ensure_betweens=True, date_range=('yesterday', 'today')))
+ self.assertEqual([day2, Day(Day.tomorrows_date())],
+ Day.all(self.db_conn, ensure_betweens=True, date_range=('today', 'tomorrow')))
+
+ def test_TodoTemplate_by_id(self):
+ tmpl = TodoTemplate(self.db_conn)
+ tmpl.save(self.db_conn)
+ retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
+ self.assertEqual(tmpl.id_, retrieved.id_)
+ self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1))
+ self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1, make_if_none=True), TodoTemplate)
+
+ def test_TodoTemplate_all(self):
+ tmpl_1 = TodoTemplate(self.db_conn)
+ tmpl_1.save(self.db_conn)
+ tmpl_2 = TodoTemplate(self.db_conn)
+ tmpl_2.save(self.db_conn)
+ self.assertEqual({tmpl_1.id_, tmpl_2.id_}, set(t.id_ for t in TodoTemplate.all(self.db_conn)))
+
+ def test_versioned_attributes(self):
+ def test(name, default, values):
+ def wait_till_next_timestamp(timestamp):
+ # if we .set() to early, the timestamp key will not have changed,
+ # i.e. we'd simply over-write the previous value
+ while Day.todays_date(with_time=True) == timestamp:
+ sleep(0.0001)
+ return Day.todays_date(with_time=True)
+ tmpl = TodoTemplate(self.db_conn)
+ tmpl.save(self.db_conn)
+ tmpl_attr = getattr(tmpl, name)
+ # check we get default value on empty history
+ self.assertEqual(tmpl_attr.newest, default)
+ self.assertEqual({}, tmpl_attr.history)
+ # check we get new value when set
+ timestamp_1 = Day.todays_date(with_time=True)
+ tmpl_attr.set(values[0])
+ self.assertEqual(tmpl_attr.newest, values[0])
+ # check history remains unchanged if setting same value as .newest
+ timestamp_2 = wait_till_next_timestamp(timestamp_1)
+ tmpl_attr.set(values[0])
+ self.assertEqual(len(tmpl_attr.history), 1)
+ # check we can access different values with .newest and .at
+ tmpl_attr.set(values[1])
+ self.assertEqual(tmpl_attr.newest, values[1])
+ self.assertEqual(tmpl_attr.at(timestamp_1), values[0])
+ # check attribute history stored in DB with parent
+ tmpl.save(self.db_conn)
+ retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
+ self.assertEqual(getattr(retrieved, name).at(timestamp_1), values[0]) # i.e. attribute.save() works
+ # check forks can use original's history, but only up to their fork moment
+ fork = tmpl.fork(self.db_conn)
+ wait_till_next_timestamp(timestamp_2)
+ tmpl_attr.set(values[2])
+ forked_attr = getattr(fork, name)
+ self.assertEqual(forked_attr.newest, values[1])
+ self.assertEqual(forked_attr.at(timestamp_1), values[0])
+ # check original and fork bifurcate their history
+ forked_attr.set(values[0])
+ self.assertEqual(forked_attr.newest, values[0])
+ self.assertEqual(tmpl_attr.newest, values[2])
+ test('title', 'UNNAMED', ['foo', 'bar', 'baz'])
+ test('default_effort', 1.0, [0.5, 3, 9])
+ test('description', '', ['foo', 'bar', 'baz'])
+
+
+
+class TestSansDB(TestCase):
+
+ def test_Day_date_validate(self):
+ self.assertEqual(None, Day.date_valid('foo'))
+ self.assertEqual(None, Day.date_valid('2024-02-30'))
+ self.assertEqual(None, Day.date_valid('2024-01-01 23:59:59'))
+ self.assertEqual(datetime(2024,1,1), Day.date_valid('2024-01-01'))
+
+ def test_Day_date_classmethods(self):
+ self.assertEqual(str(datetime.now())[:10], Day.todays_date())
+ self.assertEqual(str(datetime.now())[:DATETIME_KEY_RESOLUTION], Day.todays_date(with_time=True))
+ self.assertEqual(str(datetime.now() - timedelta(days=1))[:10], Day.yesterdays_date())
+ self.assertEqual(str(datetime.now() + timedelta(days=1))[:10], Day.tomorrows_date())
+
+ def test_Day_init(self):
+ test_date = '2024-02-29'
+ test_comment = 'foo'
+ day = Day(test_date)
+ self.assertEqual(day.date, test_date)
+ self.assertEqual(day.comment, '')
+ day = Day(test_date, test_comment)
+ self.assertEqual(day.comment, test_comment)
+ with self.assertRaises(HandledException):
+ Day('foo')
+
+ def test_Day_date_neighbors(self):
+ day = Day('2024-02-29')
+ self.assertEqual(day.prev_date, '2024-02-28')
+ self.assertEqual(day.next_date, '2024-03-01')
+
+ def test_Day_date_weekday(self):
+ day = Day('2024-02-29')
+ self.assertEqual(day.weekday, 'Thursday')
+
+ def test_Day_cmp(self):
+ day1 = Day('2024-01-01')
+ day2 = Day('2024-01-02')
+ day3 = Day('2024-01-03')
+ days = [day3, day1, day2]
+ self.assertEqual(sorted(days), [day1, day2, day3])