+#!/usr/bin/env python3
+from sqlite3 import connect as sql_connect
+from http.server import BaseHTTPRequestHandler
+from urllib.parse import parse_qs
+from datetime import datetime, timedelta
+
+
+
+PATH_DB_SCHEMA='init.sql'
+HTTP_PORT=8082
+HTML_DIR='html'
+
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+
+######## basic system stuff ############
+
+class HandledException(Exception):
+
+ def nice_exit(self):
+ from sys import exit as sys_exit
+ print(f'ABORTING: {self}')
+ sys_exit(1)
+
+
+
+class TodoDBFile:
+
+ def __init__(self, path):
+ from os.path import isfile
+ self.path = path
+ if not isfile(self.path):
+ self.make_new_if_wanted_else_abort()
+ self.validate_schema()
+
+ def make_new_if_wanted_else_abort(self):
+ create_question = f'Database file not found: {self.path}. Create? Y/n\n'
+ msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
+ legal_yesses = {'y', 'yes'}
+ create_reply = input(create_question)
+ if not create_reply.lower() in legal_yesses:
+ raise HandledException(msg_on_no)
+ with sql_connect(self.path) as conn:
+ with open(PATH_DB_SCHEMA, 'r') as f:
+ conn.executescript(f.read())
+
+ def validate_schema(self):
+ sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
+ msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
+ with sql_connect(self.path) as conn:
+ schema = ';\n'.join([r[0] for r in conn.execute(sql_for_schema) if r[0]]) + ';'
+ with open(PATH_DB_SCHEMA, 'r') as f:
+ stored_schema = f.read().rstrip()
+ if schema != stored_schema:
+ from difflib import Differ
+ d = Differ()
+ diff_msg = d.compare(schema.splitlines(), stored_schema.splitlines())
+ raise HandledException(msg_wrong_schema + '\n'.join(diff_msg))
+
+ def backup(self):
+ from shutil import copy2
+ from random import randint
+ copy2(self.path, f'{self.path}.bak.0')
+ for i in range(0, 6):
+ if 1 == randint(0, 2**i):
+ copy2(self.path, f'{self.path}.bak.{i+1}')
+ break
+
+
+
+class TodoDBConnection:
+
+ def __init__(self, db_file):
+ self.file = db_file
+ self.conn = sql_connect(self.file.path)
+
+ def commit(self):
+ self.file.backup()
+ self.conn.commit()
+
+ def exec(self, code, inputs=None):
+ if not inputs:
+ inputs = []
+ return self.conn.execute(code, inputs)
+
+ def close(self):
+ self.conn.close()
+
+
+
+######## models ############
+
+class VersionedAttribute:
+
+ def __init__(self, db_conn, parent, name, default):
+ self.parent = parent
+ self.name = name
+ self.default = default
+ self.history = {}
+ for row in db_conn.exec(f'SELECT * FROM {self.table_name} WHERE template = ?',
+ (self.parent.id_,)):
+ self.history[row[1]] = row[2]
+
+ @property
+ def table_name(self):
+ return f'versioned_{self.name}s'
+
+ def save(self, db_conn):
+ for date, value in self.history.items():
+ db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)',
+ (self.parent.id_, date, value))
+
+ @property
+ def newest_date(self):
+ return sorted(self.history.keys())[-1]
+
+ @property
+ def newest(self):
+ if 0 == len(self.history):
+ if self.parent.forked_from:
+ return getattr(self.parent.forked_from, self.name).newest
+ return self.default
+ return self.history[self.newest_date]
+
+ def set(self, value):
+ if 0 == len(self.history) or value != self.history[self.newest_date]:
+ self.history[Day.todays_date(with_time=True)] = value
+
+
+
+class Day:
+
+ def __init__(self, date, comment=''):
+ self.date = date
+ self.datetime = self.__class__.date_valid(self.date)
+ if not self.datetime:
+ raise HandledException(f'Provided date is of wrong format: {date}')
+ self.comment = comment
+
+ @classmethod
+ def from_row(cls, row):
+ return cls(row[0], row[1])
+
+ @classmethod
+ def all(cls, db_conn, ensure_betweens=False, date_range=('', '')):
+ legal_day_names = {'yesterday', 'today', 'tomorrow'}
+ for val in [val for val in date_range
+ if val != '' and val not in legal_day_names
+ and not cls.date_valid(val)]:
+ raise HandledException(f'Provided date is of wrong format: {val}')
+ start_str = date_range[0] if date_range[0] else '2024-01-01'
+ end_str = date_range[1] if date_range[1] else '2030-12-31'
+ start_date = getattr(cls, f'{start_str}s_date')() if start_str in legal_day_names else start_str
+ end_date = getattr(cls, f'{end_str}s_date')() if end_str in legal_day_names else end_str
+ if start_date > end_date:
+ temp = end_date
+ end_date = start_date
+ start_date = temp
+ days = []
+ for row in db_conn.exec('SELECT * FROM days WHERE date >= ? AND date <= ?',
+ (start_date, end_date)):
+ days += [cls.from_row(row)]
+ if ensure_betweens:
+ if '' != date_range[0] and not start_date in [d.date for d in days]:
+ days += [cls(start_date)]
+ if '' != date_range[1] and not end_date in [d.date for d in days]:
+ days += [cls(end_date)]
+ days.sort()
+ if ensure_betweens and len(days) > 1:
+ gapless_days = []
+ for i, day in enumerate(days):
+ gapless_days += [day]
+ if i < len(days) - 1:
+ while day.next_date != days[i+1].date:
+ day = Day(day.next_date)
+ gapless_days += [day]
+ days = gapless_days
+ return days
+
+ @classmethod
+ def by_date(cls, db_conn, date, make_if_none=False):
+ for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)):
+ return cls.from_row(row)
+ return cls(date) if make_if_none else None
+
+ def save(self, db_conn):
+ db_conn.exec('REPLACE INTO days VALUES (?, ?)', (self.date, self.comment))
+
+ @classmethod
+ def date_valid(cls, date):
+ try:
+ result = datetime.strptime(date, DATE_FORMAT)
+ except ValueError:
+ return None
+ return result
+
+ @classmethod
+ def todays_date(cls, with_time=False):
+ cut_length = 19 if with_time else 10
+ return str(datetime.now())[:cut_length]
+
+ @classmethod
+ def yesterdays_date(cls):
+ return str(datetime.now() - timedelta(days=1))[:10]
+
+ @classmethod
+ def tomorrows_date(cls):
+ return str(datetime.now() + timedelta(days=1))[:10]
+
+ @property
+ def next_date(self):
+ next_datetime = self.datetime + timedelta(days=1)
+ return next_datetime.strftime(DATE_FORMAT)
+
+ @property
+ def prev_date(self):
+ prev_datetime = self.datetime - timedelta(days=1)
+ return prev_datetime.strftime(DATE_FORMAT)
+
+ @property
+ def weekday(self):
+ return self.datetime.strftime('%A')
+
+ def __eq__(self, other):
+ return self.date == other.date
+
+ def __lt__(self, other):
+ return self.date < other.date
+
+
+
+class TodoTemplate:
+
+ def __init__(self, db_conn, id_):
+ self.id_ = id_
+ self.forked_from = None
+ self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED')
+ self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0)
+ self.description = VersionedAttribute(db_conn, self, 'description', '')
+
+ @classmethod
+ def from_row(cls, db_conn, row):
+ return cls(db_conn, row[0])
+
+ @classmethod
+ def all(cls, db_conn):
+ tmpls = []
+ for row in db_conn.exec('SELECT * FROM templates'):
+ tmpls += [cls.from_row(db_conn, row)]
+ return tmpls
+
+ @classmethod
+ def by_id(cls, db_conn, id_, make_if_none=False):
+ for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
+ return cls.from_row(db_conn, row)
+ if make_if_none:
+ return cls(db_conn, id_)
+ return None
+
+ def save(self, db_conn):
+ cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,))
+ if self.id_ is None:
+ self.id_ = cursor.fetchone()[0]
+ self.title.save(db_conn)
+ self.default_effort.save(db_conn)
+ self.description.save(db_conn)
+
+
+
+######## web stuff ############
+
+class ParamsParser:
+
+ def __init__(self, url_query, site_cookie):
+ self.params = parse_qs(url_query, keep_blank_values=True)
+ self.cookie = site_cookie
+
+ def get(self, key, default):
+ return self.params.get(key, [default])[0]
+
+ def get_cookied(self, key, default):
+ val = self.params.get(key, [self.cookie.get(key, default)])[0]
+ self.cookie[key] = val
+ return val
+
+
+
+class CookieDB:
+
+ def __init__(self, site, headers):
+ from http.cookies import SimpleCookie
+ from json import loads as json_loads
+ self.site = site
+ self.of_site = {}
+ self.full = SimpleCookie(headers['Cookie']) if 'Cookie' in headers.keys() else SimpleCookie()
+ if site in self.full.keys():
+ self.of_site = json_loads(self.full[site].value)
+
+ def send_headers(self):
+ from json import dumps as json_dumps
+ self.full[self.site] = json_dumps(self.of_site)
+ return [('Set-Cookie', morsel.OutputString()) for morsel in self.full.values()]
+
+ def reset(self):
+ for morsel in self.full.values():
+ morsel['expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
+
+
+
+class TodoHandler(BaseHTTPRequestHandler):
+
+ def init(self):
+ from os.path import split as path_split
+ from urllib.parse import urlparse
+ parsed_url = urlparse(self.path)
+ self.site = path_split(parsed_url.path)[1]
+ if 0 == len(self.site):
+ self.site = 'calendar'
+ self.cookie = CookieDB(self.site, self.headers)
+ self.params = ParamsParser(parsed_url.query, self.cookie.of_site)
+ self.db_conn = TodoDBConnection(self.server.db_file)
+
+ def send_code_and_headers(self, code, headers, set_cookies=True):
+ self.send_response(code)
+ if set_cookies:
+ [self.send_header(h[0], h[1]) for h in self.cookie.send_headers()]
+ for fieldname, content in headers:
+ self.send_header(fieldname, content)
+ self.end_headers()
+
+ def redirect(self, url):
+ self.send_code_and_headers(302, [('Location', url)])
+
+ def send_HTML(self, html, code=200):
+ self.send_code_and_headers(code, [('Content-type', 'text/html')])
+ self.wfile.write(bytes(html, 'utf-8'))
+
+ def send_fail(self, msg, code=400):
+ html = self.server.html.get_template('msg.html').render(msg=f'Exception: {msg}')
+ self.send_code_and_headers(code, [('Content-type', 'text/html')], set_cookies=False)
+ self.wfile.write(bytes(html, 'utf-8'))
+
+ # POST routes
+
+ def do_POST(self):
+ try:
+ self.init()
+ length = int(self.headers['content-length'])
+ self.redir_url = '/'
+ self.postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+ if self.site in {'day', 'template'}:
+ getattr(self, f'do_POST_{self.site}')()
+ self.db_conn.commit()
+ self.db_conn.close()
+ self.redirect(self.redir_url)
+ except HandledException as msg:
+ self.send_fail(msg)
+
+ def do_POST_day(self):
+ date = self.postvars['date'][0]
+ day = Day(date, self.postvars['comment'][0])
+ day.save(self.db_conn)
+
+ def do_POST_template(self):
+ id_ = self.params.get('id', None)
+ tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+ tmpl.title.set(self.postvars['title'][0])
+ tmpl.default_effort.set(float(self.postvars['default_effort'][0]))
+ tmpl.description.set(self.postvars['description'][0])
+ tmpl.save(self.db_conn)
+ self.redir_url = 'templates'
+
+ # GET routes
+
+ def do_GET(self):
+ try:
+ self.init()
+ if self.site in {'day', 'template', 'templates', 'reset_cookie', 'calendar'}:
+ page = getattr(self, f'do_GET_{self.site}')()
+ else:
+ page = self.do_GET_calendar()
+ self.db_conn.close()
+ self.send_HTML(page)
+ except HandledException as msg:
+ self.send_fail(msg)
+
+ def do_GET_reset_cookie(self):
+ self.cookie.reset()
+ msg = 'Cookie has been re-set!'
+ return self.server.html.get_template('msg.html').render(msg=msg)
+
+ def do_GET_day(self):
+ date = self.params.get_cookied('id', Day.todays_date())
+ day = Day.by_date(self.db_conn, date, make_if_none=True)
+ return self.server.html.get_template('day.html').render(day=day)
+
+ def do_GET_calendar(self):
+ from_ = self.params.get_cookied('from', 'yesterday')
+ to = self.params.get_cookied('to', '')
+ days = Day.all(self.db_conn, ensure_betweens=True, date_range=(from_, to))
+ return self.server.html.get_template('calendar.html').render(
+ days=days, from_=from_, to=to)
+
+ def do_GET_template(self):
+ id_ = self.params.get('id', None)
+ template = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+ return self.server.html.get_template('template.html').render(tmpl=template)
+
+ def do_GET_templates(self):
+ templates = TodoTemplate.all(self.db_conn)
+ return self.server.html.get_template('templates.html').render(templates=templates)
+
+
+
+def main():
+ from http.server import HTTPServer
+ from os import environ
+ from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
+ path_todo_db = environ.get('TODO_DB')
+ if not path_todo_db:
+ raise HandledException('TODO_DB environment variable not set.')
+ db_file = TodoDBFile(path_todo_db)
+ server = HTTPServer(('localhost', HTTP_PORT), TodoHandler)
+ server.db_file = db_file
+ server.html = JinjaEnv(loader=JinjaFSLoader(HTML_DIR))
+ print(f'running at http://localhost:{HTTP_PORT}')
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ print('\ncaught KeyboardInterrupt, stopping server')
+ server.server_close()
+
+
+
+if __name__ == '__main__':
+ try:
+ main()
+ except HandledException as e:
+ e.nice_exit()