home · contact · privacy
Improve accounting scripts, start todo.py rewrite.
[misc] / new_todo / todo.py
diff --git a/new_todo/todo.py b/new_todo/todo.py
new file mode 100755 (executable)
index 0000000..8b8331c
--- /dev/null
@@ -0,0 +1,442 @@
+#!/usr/bin/env python3
+from sqlite3 import connect as sql_connect
+from http.server import BaseHTTPRequestHandler
+from urllib.parse import parse_qs
+from datetime import datetime, timedelta
+
+
+
+PATH_DB_SCHEMA='init.sql'
+HTTP_PORT=8082
+HTML_DIR='html'
+
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+
+######## basic system stuff ############
+
+class HandledException(Exception):
+
+    def nice_exit(self):
+        from sys import exit as sys_exit
+        print(f'ABORTING: {self}')
+        sys_exit(1)
+
+
+
+class TodoDBFile:
+
+    def __init__(self, path):
+        from os.path import isfile
+        self.path = path
+        if not isfile(self.path):
+            self.make_new_if_wanted_else_abort()
+        self.validate_schema()
+
+    def make_new_if_wanted_else_abort(self):
+        create_question = f'Database file not found: {self.path}. Create? Y/n\n'
+        msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
+        legal_yesses = {'y', 'yes'}
+        create_reply = input(create_question)
+        if not create_reply.lower() in legal_yesses: 
+            raise HandledException(msg_on_no)
+        with sql_connect(self.path) as conn:
+            with open(PATH_DB_SCHEMA, 'r') as f:
+                conn.executescript(f.read())
+
+    def validate_schema(self):
+        sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
+        msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
+        with sql_connect(self.path) as conn:
+            schema = ';\n'.join([r[0] for r in conn.execute(sql_for_schema) if r[0]]) + ';'
+            with open(PATH_DB_SCHEMA, 'r') as f:
+                stored_schema = f.read().rstrip()
+                if schema != stored_schema:
+                    from difflib import Differ
+                    d = Differ()
+                    diff_msg = d.compare(schema.splitlines(), stored_schema.splitlines())
+                    raise HandledException(msg_wrong_schema + '\n'.join(diff_msg))
+
+    def backup(self):
+        from shutil import copy2
+        from random import randint
+        copy2(self.path, f'{self.path}.bak.0')
+        for i in range(0, 6):
+            if 1 == randint(0, 2**i):
+                copy2(self.path, f'{self.path}.bak.{i+1}')
+                break
+
+
+
+class TodoDBConnection:
+
+    def __init__(self, db_file):
+        self.file = db_file
+        self.conn = sql_connect(self.file.path)
+    
+    def commit(self):
+        self.file.backup()
+        self.conn.commit()
+
+    def exec(self, code, inputs=None):
+        if not inputs:
+            inputs = []
+        return self.conn.execute(code, inputs)
+
+    def close(self):
+        self.conn.close()
+
+
+
+######## models ############
+
+class VersionedAttribute:
+
+    def __init__(self, db_conn, parent, name, default):
+        self.parent = parent
+        self.name = name
+        self.default = default
+        self.history = {}
+        for row in db_conn.exec(f'SELECT * FROM {self.table_name} WHERE template = ?',
+                                (self.parent.id_,)):
+            self.history[row[1]] = row[2]
+
+    @property
+    def table_name(self):
+        return f'versioned_{self.name}s' 
+
+    def save(self, db_conn):
+        for date, value in self.history.items():
+            db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)',
+                         (self.parent.id_, date, value))
+
+    @property
+    def newest_date(self):
+        return sorted(self.history.keys())[-1]
+
+    @property
+    def newest(self):
+        if 0 == len(self.history):
+            if self.parent.forked_from:
+                return getattr(self.parent.forked_from, self.name).newest
+            return self.default
+        return self.history[self.newest_date]
+
+    def set(self, value):
+        if 0 == len(self.history) or value != self.history[self.newest_date]:
+            self.history[Day.todays_date(with_time=True)] = value
+
+
+
+class Day:
+
+    def __init__(self, date, comment=''):
+        self.date = date
+        self.datetime = self.__class__.date_valid(self.date) 
+        if not self.datetime:
+            raise HandledException(f'Provided date is of wrong format: {date}')
+        self.comment = comment
+
+    @classmethod
+    def from_row(cls, row):
+        return cls(row[0], row[1])
+
+    @classmethod
+    def all(cls, db_conn, ensure_betweens=False, date_range=('', '')):
+        legal_day_names = {'yesterday', 'today', 'tomorrow'}
+        for val in [val for val in date_range
+                    if val != '' and val not in legal_day_names
+                    and not cls.date_valid(val)]:
+            raise HandledException(f'Provided date is of wrong format: {val}')
+        start_str = date_range[0] if date_range[0] else '2024-01-01'
+        end_str = date_range[1] if date_range[1] else '2030-12-31'
+        start_date = getattr(cls, f'{start_str}s_date')() if start_str in legal_day_names else start_str
+        end_date = getattr(cls, f'{end_str}s_date')() if end_str in legal_day_names else end_str
+        if start_date > end_date:
+            temp = end_date
+            end_date = start_date
+            start_date = temp
+        days = []
+        for row in db_conn.exec('SELECT * FROM days WHERE date >= ? AND date <= ?',
+                                (start_date, end_date)):
+            days += [cls.from_row(row)]
+        if ensure_betweens:
+            if '' != date_range[0] and not start_date in [d.date for d in days]: 
+                days += [cls(start_date)]
+            if '' != date_range[1] and not end_date in [d.date for d in days]: 
+                days += [cls(end_date)]
+        days.sort()
+        if ensure_betweens and len(days) > 1:
+            gapless_days = []
+            for i, day in enumerate(days):
+                gapless_days += [day]
+                if i < len(days) - 1:
+                    while day.next_date != days[i+1].date:
+                        day = Day(day.next_date)
+                        gapless_days += [day]
+            days = gapless_days
+        return days
+
+    @classmethod
+    def by_date(cls, db_conn, date, make_if_none=False):
+        for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)):
+            return cls.from_row(row)
+        return cls(date) if make_if_none else None
+
+    def save(self, db_conn):
+        db_conn.exec('REPLACE INTO days VALUES (?, ?)', (self.date, self.comment))
+
+    @classmethod
+    def date_valid(cls, date):
+        try:
+            result = datetime.strptime(date, DATE_FORMAT)
+        except ValueError:
+            return None 
+        return result 
+
+    @classmethod
+    def todays_date(cls, with_time=False):
+        cut_length = 19 if with_time else 10
+        return str(datetime.now())[:cut_length]
+
+    @classmethod
+    def yesterdays_date(cls):
+        return str(datetime.now() - timedelta(days=1))[:10]
+
+    @classmethod
+    def tomorrows_date(cls):
+        return str(datetime.now() + timedelta(days=1))[:10]
+
+    @property
+    def next_date(self):
+        next_datetime = self.datetime + timedelta(days=1)
+        return next_datetime.strftime(DATE_FORMAT)
+
+    @property
+    def prev_date(self):
+        prev_datetime = self.datetime - timedelta(days=1)
+        return prev_datetime.strftime(DATE_FORMAT)
+
+    @property
+    def weekday(self):
+        return self.datetime.strftime('%A')
+
+    def __eq__(self, other):
+        return self.date == other.date
+
+    def __lt__(self, other):
+        return self.date < other.date
+
+
+
+class TodoTemplate:
+
+    def __init__(self, db_conn, id_):
+        self.id_ = id_ 
+        self.forked_from = None
+        self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED') 
+        self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0) 
+        self.description = VersionedAttribute(db_conn, self, 'description', '') 
+
+    @classmethod
+    def from_row(cls, db_conn, row):
+        return cls(db_conn, row[0])
+
+    @classmethod
+    def all(cls, db_conn):
+        tmpls = []
+        for row in db_conn.exec('SELECT * FROM templates'):
+            tmpls += [cls.from_row(db_conn, row)]
+        return tmpls 
+
+    @classmethod
+    def by_id(cls, db_conn, id_, make_if_none=False):
+        for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
+            return cls.from_row(db_conn, row)
+        if make_if_none:
+            return cls(db_conn, id_) 
+        return None
+
+    def save(self, db_conn):
+        cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,))
+        if self.id_ is None:
+            self.id_ = cursor.fetchone()[0]
+        self.title.save(db_conn)
+        self.default_effort.save(db_conn)
+        self.description.save(db_conn)
+
+
+
+######## web stuff ############
+
+class ParamsParser:
+
+    def __init__(self, url_query, site_cookie):
+        self.params = parse_qs(url_query, keep_blank_values=True)
+        self.cookie = site_cookie
+
+    def get(self, key, default):
+        return self.params.get(key, [default])[0]
+
+    def get_cookied(self, key, default):
+        val = self.params.get(key, [self.cookie.get(key, default)])[0]
+        self.cookie[key] = val 
+        return val 
+
+
+
+class CookieDB:
+
+    def __init__(self, site, headers):
+        from http.cookies import SimpleCookie
+        from json import loads as json_loads
+        self.site = site
+        self.of_site = {}
+        self.full = SimpleCookie(headers['Cookie']) if 'Cookie' in headers.keys() else SimpleCookie()
+        if site in self.full.keys():
+            self.of_site = json_loads(self.full[site].value)
+
+    def send_headers(self):
+        from json import dumps as json_dumps
+        self.full[self.site] = json_dumps(self.of_site)
+        return [('Set-Cookie', morsel.OutputString()) for morsel in self.full.values()]
+
+    def reset(self):
+        for morsel in self.full.values():
+             morsel['expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
+
+
+
+class TodoHandler(BaseHTTPRequestHandler):
+
+    def init(self):
+        from os.path import split as path_split
+        from urllib.parse import urlparse
+        parsed_url = urlparse(self.path)
+        self.site = path_split(parsed_url.path)[1]
+        if 0 == len(self.site):
+            self.site = 'calendar'
+        self.cookie = CookieDB(self.site, self.headers)
+        self.params = ParamsParser(parsed_url.query, self.cookie.of_site)
+        self.db_conn = TodoDBConnection(self.server.db_file)
+
+    def send_code_and_headers(self, code, headers, set_cookies=True):
+        self.send_response(code)
+        if set_cookies:
+            [self.send_header(h[0], h[1]) for h in self.cookie.send_headers()]
+        for fieldname, content in headers:
+            self.send_header(fieldname, content)
+        self.end_headers()
+
+    def redirect(self, url):
+        self.send_code_and_headers(302, [('Location', url)])
+
+    def send_HTML(self, html, code=200):
+        self.send_code_and_headers(code, [('Content-type', 'text/html')])
+        self.wfile.write(bytes(html, 'utf-8'))
+
+    def send_fail(self, msg, code=400):
+        html = self.server.html.get_template('msg.html').render(msg=f'Exception: {msg}')
+        self.send_code_and_headers(code, [('Content-type', 'text/html')], set_cookies=False)
+        self.wfile.write(bytes(html, 'utf-8'))
+
+    # POST routes
+
+    def do_POST(self):
+        try:
+            self.init()
+            length = int(self.headers['content-length'])
+            self.redir_url = '/'
+            self.postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+            if self.site in {'day', 'template'}:
+                getattr(self, f'do_POST_{self.site}')()
+            self.db_conn.commit()
+            self.db_conn.close()
+            self.redirect(self.redir_url)
+        except HandledException as msg:
+            self.send_fail(msg)
+
+    def do_POST_day(self):
+        date = self.postvars['date'][0]
+        day = Day(date, self.postvars['comment'][0])
+        day.save(self.db_conn)
+
+    def do_POST_template(self):
+        id_ = self.params.get('id', None)
+        tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+        tmpl.title.set(self.postvars['title'][0])
+        tmpl.default_effort.set(float(self.postvars['default_effort'][0]))
+        tmpl.description.set(self.postvars['description'][0])
+        tmpl.save(self.db_conn)
+        self.redir_url = 'templates'
+
+    # GET routes
+
+    def do_GET(self):
+        try:
+            self.init()
+            if self.site in {'day', 'template', 'templates', 'reset_cookie', 'calendar'}:
+                page = getattr(self, f'do_GET_{self.site}')()
+            else:
+                page = self.do_GET_calendar() 
+            self.db_conn.close()
+            self.send_HTML(page)
+        except HandledException as msg:
+            self.send_fail(msg)
+
+    def do_GET_reset_cookie(self):
+        self.cookie.reset()
+        msg = 'Cookie has been re-set!'
+        return self.server.html.get_template('msg.html').render(msg=msg)
+
+    def do_GET_day(self):
+        date = self.params.get_cookied('id', Day.todays_date())
+        day = Day.by_date(self.db_conn, date, make_if_none=True)
+        return self.server.html.get_template('day.html').render(day=day)
+
+    def do_GET_calendar(self):
+        from_ = self.params.get_cookied('from', 'yesterday') 
+        to = self.params.get_cookied('to', '') 
+        days = Day.all(self.db_conn, ensure_betweens=True, date_range=(from_, to))
+        return self.server.html.get_template('calendar.html').render(
+                days=days, from_=from_, to=to)
+
+    def do_GET_template(self):
+        id_ = self.params.get('id', None)
+        template = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+        return self.server.html.get_template('template.html').render(tmpl=template)
+
+    def do_GET_templates(self):
+        templates = TodoTemplate.all(self.db_conn)
+        return self.server.html.get_template('templates.html').render(templates=templates)
+
+
+
+def main():
+    from http.server import HTTPServer
+    from os import environ
+    from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader 
+    path_todo_db = environ.get('TODO_DB')
+    if not path_todo_db:
+        raise HandledException('TODO_DB environment variable not set.')
+    db_file = TodoDBFile(path_todo_db)
+    server = HTTPServer(('localhost', HTTP_PORT), TodoHandler)
+    server.db_file = db_file
+    server.html = JinjaEnv(loader=JinjaFSLoader(HTML_DIR))
+    print(f'running at http://localhost:{HTTP_PORT}')
+    try:
+        server.serve_forever()
+    except KeyboardInterrupt:
+        print('\ncaught KeyboardInterrupt, stopping server')
+    server.server_close()
+
+
+
+if __name__ == '__main__':
+    try:
+        main()
+    except HandledException as e:
+        e.nice_exit()