2 from sqlite3 import connect as sql_connect
3 from http.server import BaseHTTPRequestHandler
4 from urllib.parse import parse_qs
5 from datetime import datetime, timedelta
6 from unittest import TestCase
7 from os.path import isfile
12 PATH_DB_SCHEMA='init.sql'
18 DATE_FORMAT = '%Y-%m-%d'
19 DATETIME_KEY_RESOLUTION = 24
23 ######## basic system stuff ############
25 class HandledException(Exception):
28 from sys import exit as sys_exit
29 print(f'ABORTING: {self}')
36 def __init__(self, path, force_creation=False):
38 if not isfile(self.path):
39 self._make_new_if_wanted_else_abort(force_creation)
40 self._validate_schema()
42 def _make_new_if_wanted_else_abort(self, force_creation):
43 if not force_creation:
44 create_question = f'Database file not found: {self.path}. Create? Y/n\n'
45 msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
46 legal_yesses = {'y', 'yes'}
47 create_reply = input(create_question)
48 if not create_reply.lower() in legal_yesses:
49 raise HandledException(msg_on_no)
50 with sql_connect(self.path) as conn:
51 with open(PATH_DB_SCHEMA, 'r') as f:
52 conn.executescript(f.read())
54 def _validate_schema(self):
55 sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
56 msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
57 with sql_connect(self.path) as conn:
58 schema = ';\n'.join([r[0] for r in conn.execute(sql_for_schema) if r[0]]) + ';'
59 with open(PATH_DB_SCHEMA, 'r') as f:
60 stored_schema = f.read().rstrip()
61 if schema != stored_schema:
62 from difflib import Differ
64 diff_msg = d.compare(schema.splitlines(), stored_schema.splitlines())
65 raise HandledException(msg_wrong_schema + '\n'.join(diff_msg))
68 from shutil import copy2
69 from random import randint
70 copy2(self.path, f'{self.path}.bak.0')
72 if 1 == randint(0, 2**i):
73 copy2(self.path, f'{self.path}.bak.{i+1}')
78 class TodoDBConnection:
80 def __init__(self, db_file):
82 self.conn = sql_connect(self.file.path)
88 def exec(self, code, inputs=None):
91 return self.conn.execute(code, inputs)
98 ######## models ############
100 class VersionedAttribute:
102 def __init__(self, db_conn, parent, name, default):
105 self.default = default
107 for row in db_conn.exec(f'SELECT * FROM {self.table_name} WHERE template = ?',
109 self.history[row[1]] = row[2]
112 def table_name(self):
113 return f'versioned_{self.name}s'
115 def save(self, db_conn):
116 for date, value in self.history.items():
117 db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)',
118 (self.parent.id_, date, value))
121 def _newest_datetime(self):
122 return sorted(self.history.keys())[-1]
124 def _forked_value_at(self, date_with_time):
125 return getattr(self.parent.forked_from, self.name).at(date_with_time)
129 if 0 == len(self.history):
130 if self.parent.forked_from:
131 return self._forked_value_at(self.parent.forked_at)
133 return self.history[self._newest_datetime]
135 def at(self, date_with_time):
136 sorted_datetimes = sorted(self.history.keys())
137 if self.parent.forked_from and (0 == len(sorted_datetimes) or sorted_datetimes[0] > date_with_time):
138 return self._forked_value_at(date_with_time)
139 elif 0 == len(sorted_datetimes):
141 ret = self.history[sorted_datetimes[0]]
142 for k, v in self.history.items():
143 if k > date_with_time:
148 def set(self, value):
149 if 0 == len(self.history) or value != self.history[self._newest_datetime]:
150 self.history[Day.todays_date(with_time=True)] = value
156 def __init__(self, date, comment=''):
158 self.datetime = self.__class__.date_valid(self.date)
159 if not self.datetime:
160 raise HandledException(f'Provided date is of wrong format: {date}')
161 self.comment = comment
164 def from_row(cls, row):
165 return cls(row[0], row[1])
168 def all(cls, db_conn, ensure_betweens=False, date_range=('', '')):
169 legal_day_names = {'yesterday', 'today', 'tomorrow'}
170 for val in [val for val in date_range
171 if val != '' and val not in legal_day_names
172 and not cls.date_valid(val)]:
173 raise HandledException(f'Provided date is of wrong format: {val}')
174 start_str = date_range[0] if date_range[0] else '2024-01-01'
175 end_str = date_range[1] if date_range[1] else '2030-12-31'
176 start_date = getattr(cls, f'{start_str}s_date')() if start_str in legal_day_names else start_str
177 end_date = getattr(cls, f'{end_str}s_date')() if end_str in legal_day_names else end_str
178 if start_date > end_date:
180 end_date = start_date
183 for row in db_conn.exec('SELECT * FROM days WHERE date >= ? AND date <= ?',
184 (start_date, end_date)):
185 days += [cls.from_row(row)]
187 if '' != date_range[0] and not start_date in [d.date for d in days]:
188 days += [cls(start_date)]
189 if '' != date_range[1] and not end_date in [d.date for d in days]:
190 days += [cls(end_date)]
192 if ensure_betweens and len(days) > 1:
194 for i, day in enumerate(days):
195 gapless_days += [day]
196 if i < len(days) - 1:
197 while day.next_date != days[i+1].date:
198 day = Day(day.next_date)
199 gapless_days += [day]
204 def by_date(cls, db_conn, date, make_if_none=False):
205 for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)):
206 return cls.from_row(row)
207 return cls(date) if make_if_none else None
209 def save(self, db_conn):
210 db_conn.exec('REPLACE INTO days VALUES (?, ?)', (self.date, self.comment))
213 def date_valid(cls, date):
215 result = datetime.strptime(date, DATE_FORMAT)
216 except (ValueError, TypeError):
221 def todays_date(cls, with_time=False):
222 cut_length = DATETIME_KEY_RESOLUTION if with_time else 10
223 return str(datetime.now())[:cut_length]
226 def yesterdays_date(cls):
227 return str(datetime.now() - timedelta(days=1))[:10]
230 def tomorrows_date(cls):
231 return str(datetime.now() + timedelta(days=1))[:10]
235 next_datetime = self.datetime + timedelta(days=1)
236 return next_datetime.strftime(DATE_FORMAT)
240 prev_datetime = self.datetime - timedelta(days=1)
241 return prev_datetime.strftime(DATE_FORMAT)
245 return self.datetime.strftime('%A')
247 def __eq__(self, other):
248 return self.date == other.date
250 def __lt__(self, other):
251 return self.date < other.date
257 def __init__(self, db_conn, id_=None, forked_from=None, forked_at=None):
259 self.forked_from = TodoTemplate.by_id(db_conn, forked_from)
260 self.forked_at = forked_at
261 self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED')
262 self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0)
263 self.description = VersionedAttribute(db_conn, self, 'description', '')
266 def from_row(cls, db_conn, row):
267 return cls(db_conn, row[0], row[1], row[2])
270 def all(cls, db_conn):
272 for row in db_conn.exec('SELECT * FROM templates'):
273 tmpls += [cls.from_row(db_conn, row)]
277 def by_id(cls, db_conn, id_, make_if_none=False):
278 for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
279 return cls.from_row(db_conn, row)
281 return cls(db_conn, id_, None, None)
284 def save(self, db_conn):
285 cursor = db_conn.exec('REPLACE INTO templates VALUES (?,?,?) RETURNING ID',
286 (self.id_, self.forked_from.id_ if self.forked_from else None, self.forked_at))
288 self.id_ = cursor.fetchone()[0]
289 self.title.save(db_conn)
290 self.default_effort.save(db_conn)
291 self.description.save(db_conn)
293 def fork(self, db_conn):
294 return self.__class__(db_conn, id_=None, forked_from=self.id_, forked_at=Day.todays_date(with_time=True))
298 ######## web stuff ############
302 def __init__(self, url_query, site_cookie):
303 self.params = parse_qs(url_query, keep_blank_values=True)
304 self.cookie = site_cookie
306 def get(self, key, default):
307 return self.params.get(key, [default])[0]
309 def get_cookied(self, key, default):
310 val = self.params.get(key, [self.cookie.get(key, default)])[0]
311 self.cookie[key] = val
318 def __init__(self, site, headers):
319 from http.cookies import SimpleCookie
320 from json import loads as json_loads
323 self.full = SimpleCookie(headers['Cookie']) if 'Cookie' in headers.keys() else SimpleCookie()
324 if site in self.full.keys():
325 self.of_site = json_loads(self.full[site].value)
327 def send_headers(self):
328 from json import dumps as json_dumps
329 self.full[self.site] = json_dumps(self.of_site)
330 return [('Set-Cookie', morsel.OutputString()) for morsel in self.full.values()]
333 for morsel in self.full.values():
334 morsel['expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
338 class TodoHandler(BaseHTTPRequestHandler):
341 from os.path import split as path_split
342 from urllib.parse import urlparse
343 parsed_url = urlparse(self.path)
344 self.site = path_split(parsed_url.path)[1]
345 if 0 == len(self.site):
346 self.site = 'calendar'
347 self.cookie = CookieDB(self.site, self.headers)
348 self.params = ParamsParser(parsed_url.query, self.cookie.of_site)
349 self.db_conn = TodoDBConnection(self.server.db_file)
351 def send_code_and_headers(self, code, headers, set_cookies=True):
352 self.send_response(code)
354 [self.send_header(h[0], h[1]) for h in self.cookie.send_headers()]
355 for fieldname, content in headers:
356 self.send_header(fieldname, content)
359 def redirect(self, url):
360 self.send_code_and_headers(302, [('Location', url)])
362 def send_HTML(self, html, code=200):
363 self.send_code_and_headers(code, [('Content-type', 'text/html')])
364 self.wfile.write(bytes(html, 'utf-8'))
366 def send_fail(self, msg, code=400):
367 html = self.server.html.get_template('msg.html').render(msg=f'Exception: {msg}')
368 self.send_code_and_headers(code, [('Content-type', 'text/html')], set_cookies=False)
369 self.wfile.write(bytes(html, 'utf-8'))
376 length = int(self.headers['content-length'])
378 self.postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
379 if self.site in {'day', 'template'}:
380 getattr(self, f'do_POST_{self.site}')()
381 self.db_conn.commit()
383 self.redirect(self.redir_url)
384 except HandledException as msg:
387 def do_POST_day(self):
388 date = self.postvars['date'][0]
389 day = Day(date, self.postvars['comment'][0])
390 day.save(self.db_conn)
392 def do_POST_template(self):
393 id_ = self.params.get('id', None)
394 tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
395 tmpl.title.set(self.postvars['title'][0])
396 tmpl.default_effort.set(float(self.postvars['default_effort'][0]))
397 tmpl.description.set(self.postvars['description'][0])
398 tmpl.save(self.db_conn)
399 self.redir_url = 'templates'
406 if self.site in {'day', 'template', 'templates', 'reset_cookie', 'calendar'}:
407 page = getattr(self, f'do_GET_{self.site}')()
409 page = self.do_GET_calendar()
412 except HandledException as msg:
415 def do_GET_reset_cookie(self):
417 msg = 'Cookie has been re-set!'
418 return self.server.html.get_template('msg.html').render(msg=msg)
420 def do_GET_day(self):
421 date = self.params.get_cookied('id', Day.todays_date())
422 day = Day.by_date(self.db_conn, date, make_if_none=True)
423 return self.server.html.get_template('day.html').render(day=day)
425 def do_GET_calendar(self):
426 from_ = self.params.get_cookied('from', 'yesterday')
427 to = self.params.get_cookied('to', '')
428 days = Day.all(self.db_conn, ensure_betweens=True, date_range=(from_, to))
429 return self.server.html.get_template('calendar.html').render(
430 days=days, from_=from_, to=to)
432 def do_GET_template(self):
433 id_ = self.params.get('id', None)
434 template = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
435 return self.server.html.get_template('template.html').render(tmpl=template)
437 def do_GET_templates(self):
438 templates = TodoTemplate.all(self.db_conn)
439 return self.server.html.get_template('templates.html').render(templates=templates)
446 from http.server import HTTPServer
447 from os import environ
448 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
449 path_todo_db = environ.get('TODO_DB')
451 raise HandledException('TODO_DB environment variable not set.')
452 db_file = TodoDBFile(path_todo_db)
453 server = HTTPServer(('localhost', HTTP_PORT), TodoHandler)
454 server.db_file = db_file
455 server.html = JinjaEnv(loader=JinjaFSLoader(HTML_DIR))
456 print(f'running at http://localhost:{HTTP_PORT}')
458 server.serve_forever()
459 except KeyboardInterrupt:
460 print('\ncaught KeyboardInterrupt, stopping server')
461 server.server_close()
465 if __name__ == '__main__':
468 except HandledException as e:
473 # testing – run with: python3 -m unittest todo.py
475 class TestWithDB(TestCase):
478 self.db_file = TodoDBFile(f'test_db:{datetime.now().timestamp()}', force_creation=True)
479 self.db_conn = TodoDBConnection(self.db_file)
480 self.bak_0_path = f'{self.db_file.path}.bak.0'
483 from os import remove
484 remove(self.db_file.path)
485 for i in range(0, 9):
486 bak_path = f'{self.db_file.path}.bak.{i}'
487 if isfile(f'{self.db_file.path}.bak.{i}'):
490 def test_backup_bak_0_file_content(self):
491 day = Day('2024-01-01')
492 day.save(self.db_conn)
493 self.db_conn.commit() # backups before writing, so expect different file contents
494 with open(self.db_file.path, 'rb') as f1:
495 original_content = f1.read()
496 with open(self.bak_0_path, 'rb') as f2:
497 backup_content = f2.read()
498 self.assertNotEqual(original_content, backup_content)
499 self.db_conn.commit() # this time commit without changes, so expect equal file contents
500 with open(self.db_file.path, 'rb') as f1:
501 original_content = f1.read()
502 with open(self.bak_0_path, 'rb') as f2:
503 backup_content = f2.read()
504 self.assertEqual(original_content, backup_content)
506 def test_backup_bak_0_file_attributes(self):
508 sleep(0.1) # so mtime would change if not copied
509 self.db_conn.commit()
510 original_stat = stat(self.db_file.path)
511 backup_stat = stat(self.bak_0_path)
512 for name in {'st_mode', 'st_dev', 'st_nlink', 'st_uid', 'st_gid', 'st_size', 'st_atime', 'st_mtime'}:
513 self.assertEqual(getattr(original_stat, name), getattr(backup_stat, name))
515 def test_Day_by_date(self):
516 test_date_1 = '2024-02-28'
517 test_date_2 = '2024-02-29'
519 day1 = Day(test_date_1, test_comment)
520 day1.save(self.db_conn)
521 retrieved_day = Day.by_date(self.db_conn, test_date_1)
522 self.assertEqual(day1, retrieved_day)
523 self.assertEqual(day1.comment, retrieved_day.comment)
524 self.assertEqual(None, Day.by_date(self.db_conn, test_date_2))
525 self.assertEqual(Day(test_date_2), Day.by_date(self.db_conn, test_date_2, make_if_none=True))
527 def test_Day_all(self):
528 with self.assertRaises(HandledException):
529 Day.all(self.db_conn, date_range=(None, None))
530 Day.all(self.db_conn, date_range=('foo', ''))
531 Day.all(self.db_conn, date_range=('', '2024-02-30'))
532 test_date_1 = str(datetime.now() - timedelta(days=2))[:10]
533 test_date_2 = Day.todays_date()
534 test_date_3 = str(datetime.now() + timedelta(days=2))[:10]
535 day1 = Day(test_date_1)
536 day2 = Day(test_date_2)
537 day3 = Day(test_date_3)
538 day1.save(self.db_conn)
539 day2.save(self.db_conn)
540 day3.save(self.db_conn)
541 self.assertEqual([day1, day2, day3], Day.all(self.db_conn))
542 self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=('yesterday', '')))
543 self.assertEqual([day2, day3], Day.all(self.db_conn, date_range=(Day.yesterdays_date(), '')))
544 self.assertEqual([day3], Day.all(self.db_conn, date_range=('tomorrow', '')))
545 self.assertEqual([day3], Day.all(self.db_conn, date_range=(Day.tomorrows_date(), '')))
546 self.assertEqual([day2], Day.all(self.db_conn, date_range=('today', Day.todays_date())))
547 self.assertEqual([day1], Day.all(self.db_conn, date_range=('', 'yesterday')))
548 self.assertEqual([Day(Day.yesterdays_date()), day2],
549 Day.all(self.db_conn, ensure_betweens=True, date_range=('yesterday', 'today')))
550 self.assertEqual([day2, Day(Day.tomorrows_date())],
551 Day.all(self.db_conn, ensure_betweens=True, date_range=('today', 'tomorrow')))
553 def test_TodoTemplate_by_id(self):
554 tmpl = TodoTemplate(self.db_conn)
555 tmpl.save(self.db_conn)
556 retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
557 self.assertEqual(tmpl.id_, retrieved.id_)
558 self.assertEqual(None, TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1))
559 self.assertIsInstance(TodoTemplate.by_id(self.db_conn, tmpl.id_ + 1, make_if_none=True), TodoTemplate)
561 def test_TodoTemplate_all(self):
562 tmpl_1 = TodoTemplate(self.db_conn)
563 tmpl_1.save(self.db_conn)
564 tmpl_2 = TodoTemplate(self.db_conn)
565 tmpl_2.save(self.db_conn)
566 self.assertEqual({tmpl_1.id_, tmpl_2.id_}, set(t.id_ for t in TodoTemplate.all(self.db_conn)))
568 def test_versioned_attributes(self):
569 def test(name, default, values):
570 def wait_till_next_timestamp(timestamp):
571 # if we .set() to early, the timestamp key will not have changed,
572 # i.e. we'd simply over-write the previous value
573 while Day.todays_date(with_time=True) == timestamp:
575 return Day.todays_date(with_time=True)
576 tmpl = TodoTemplate(self.db_conn)
577 tmpl.save(self.db_conn)
578 tmpl_attr = getattr(tmpl, name)
579 # check we get default value on empty history
580 self.assertEqual(tmpl_attr.newest, default)
581 self.assertEqual({}, tmpl_attr.history)
582 # check we get new value when set
583 timestamp_1 = Day.todays_date(with_time=True)
584 tmpl_attr.set(values[0])
585 self.assertEqual(tmpl_attr.newest, values[0])
586 # check history remains unchanged if setting same value as .newest
587 timestamp_2 = wait_till_next_timestamp(timestamp_1)
588 tmpl_attr.set(values[0])
589 self.assertEqual(len(tmpl_attr.history), 1)
590 # check we can access different values with .newest and .at
591 tmpl_attr.set(values[1])
592 self.assertEqual(tmpl_attr.newest, values[1])
593 self.assertEqual(tmpl_attr.at(timestamp_1), values[0])
594 # check attribute history stored in DB with parent
595 tmpl.save(self.db_conn)
596 retrieved = TodoTemplate.by_id(self.db_conn, tmpl.id_)
597 self.assertEqual(getattr(retrieved, name).at(timestamp_1), values[0]) # i.e. attribute.save() works
598 # check forks can use original's history, but only up to their fork moment
599 fork = tmpl.fork(self.db_conn)
600 wait_till_next_timestamp(timestamp_2)
601 tmpl_attr.set(values[2])
602 forked_attr = getattr(fork, name)
603 self.assertEqual(forked_attr.newest, values[1])
604 self.assertEqual(forked_attr.at(timestamp_1), values[0])
605 # check original and fork bifurcate their history
606 forked_attr.set(values[0])
607 self.assertEqual(forked_attr.newest, values[0])
608 self.assertEqual(tmpl_attr.newest, values[2])
609 test('title', 'UNNAMED', ['foo', 'bar', 'baz'])
610 test('default_effort', 1.0, [0.5, 3, 9])
611 test('description', '', ['foo', 'bar', 'baz'])
615 class TestSansDB(TestCase):
617 def test_Day_date_validate(self):
618 self.assertEqual(None, Day.date_valid('foo'))
619 self.assertEqual(None, Day.date_valid('2024-02-30'))
620 self.assertEqual(None, Day.date_valid('2024-01-01 23:59:59'))
621 self.assertEqual(datetime(2024,1,1), Day.date_valid('2024-01-01'))
623 def test_Day_date_classmethods(self):
624 self.assertEqual(str(datetime.now())[:10], Day.todays_date())
625 self.assertEqual(str(datetime.now())[:DATETIME_KEY_RESOLUTION], Day.todays_date(with_time=True))
626 self.assertEqual(str(datetime.now() - timedelta(days=1))[:10], Day.yesterdays_date())
627 self.assertEqual(str(datetime.now() + timedelta(days=1))[:10], Day.tomorrows_date())
629 def test_Day_init(self):
630 test_date = '2024-02-29'
633 self.assertEqual(day.date, test_date)
634 self.assertEqual(day.comment, '')
635 day = Day(test_date, test_comment)
636 self.assertEqual(day.comment, test_comment)
637 with self.assertRaises(HandledException):
640 def test_Day_date_neighbors(self):
641 day = Day('2024-02-29')
642 self.assertEqual(day.prev_date, '2024-02-28')
643 self.assertEqual(day.next_date, '2024-03-01')
645 def test_Day_date_weekday(self):
646 day = Day('2024-02-29')
647 self.assertEqual(day.weekday, 'Thursday')
649 def test_Day_cmp(self):
650 day1 = Day('2024-01-01')
651 day2 = Day('2024-01-02')
652 day3 = Day('2024-01-03')
653 days = [day3, day1, day2]
654 self.assertEqual(sorted(days), [day1, day2, day3])