X-Git-Url: https://plomlompom.com/repos/?p=misc;a=blobdiff_plain;f=new_todo%2Ftodo.py;fp=new_todo%2Ftodo.py;h=8b8331cea52593f5beff13bfb168d8b17e3f8a4d;hp=0000000000000000000000000000000000000000;hb=9637ae6c570e9cf0cce500d85af03c74d6771362;hpb=ecee822bebf62049803b90fc8d8a0b484915a0fc diff --git a/new_todo/todo.py b/new_todo/todo.py new file mode 100755 index 0000000..8b8331c --- /dev/null +++ b/new_todo/todo.py @@ -0,0 +1,442 @@ +#!/usr/bin/env python3 +from sqlite3 import connect as sql_connect +from http.server import BaseHTTPRequestHandler +from urllib.parse import parse_qs +from datetime import datetime, timedelta + + + +PATH_DB_SCHEMA='init.sql' +HTTP_PORT=8082 +HTML_DIR='html' + + + +DATE_FORMAT = '%Y-%m-%d' + + + +######## basic system stuff ############ + +class HandledException(Exception): + + def nice_exit(self): + from sys import exit as sys_exit + print(f'ABORTING: {self}') + sys_exit(1) + + + +class TodoDBFile: + + def __init__(self, path): + from os.path import isfile + self.path = path + if not isfile(self.path): + self.make_new_if_wanted_else_abort() + self.validate_schema() + + def make_new_if_wanted_else_abort(self): + create_question = f'Database file not found: {self.path}. Create? Y/n\n' + msg_on_no = 'Interpreting reply as "no", but cannot run without database file.' + legal_yesses = {'y', 'yes'} + create_reply = input(create_question) + if not create_reply.lower() in legal_yesses: + raise HandledException(msg_on_no) + with sql_connect(self.path) as conn: + with open(PATH_DB_SCHEMA, 'r') as f: + conn.executescript(f.read()) + + def validate_schema(self): + sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql' + msg_wrong_schema = 'Database has wrong tables schema. Diff:\n' + with sql_connect(self.path) as conn: + schema = ';\n'.join([r[0] for r in conn.execute(sql_for_schema) if r[0]]) + ';' + with open(PATH_DB_SCHEMA, 'r') as f: + stored_schema = f.read().rstrip() + if schema != stored_schema: + from difflib import Differ + d = Differ() + diff_msg = d.compare(schema.splitlines(), stored_schema.splitlines()) + raise HandledException(msg_wrong_schema + '\n'.join(diff_msg)) + + def backup(self): + from shutil import copy2 + from random import randint + copy2(self.path, f'{self.path}.bak.0') + for i in range(0, 6): + if 1 == randint(0, 2**i): + copy2(self.path, f'{self.path}.bak.{i+1}') + break + + + +class TodoDBConnection: + + def __init__(self, db_file): + self.file = db_file + self.conn = sql_connect(self.file.path) + + def commit(self): + self.file.backup() + self.conn.commit() + + def exec(self, code, inputs=None): + if not inputs: + inputs = [] + return self.conn.execute(code, inputs) + + def close(self): + self.conn.close() + + + +######## models ############ + +class VersionedAttribute: + + def __init__(self, db_conn, parent, name, default): + self.parent = parent + self.name = name + self.default = default + self.history = {} + for row in db_conn.exec(f'SELECT * FROM {self.table_name} WHERE template = ?', + (self.parent.id_,)): + self.history[row[1]] = row[2] + + @property + def table_name(self): + return f'versioned_{self.name}s' + + def save(self, db_conn): + for date, value in self.history.items(): + db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)', + (self.parent.id_, date, value)) + + @property + def newest_date(self): + return sorted(self.history.keys())[-1] + + @property + def newest(self): + if 0 == len(self.history): + if self.parent.forked_from: + return getattr(self.parent.forked_from, self.name).newest + return self.default + return self.history[self.newest_date] + + def set(self, value): + if 0 == len(self.history) or value != self.history[self.newest_date]: + self.history[Day.todays_date(with_time=True)] = value + + + +class Day: + + def __init__(self, date, comment=''): + self.date = date + self.datetime = self.__class__.date_valid(self.date) + if not self.datetime: + raise HandledException(f'Provided date is of wrong format: {date}') + self.comment = comment + + @classmethod + def from_row(cls, row): + return cls(row[0], row[1]) + + @classmethod + def all(cls, db_conn, ensure_betweens=False, date_range=('', '')): + legal_day_names = {'yesterday', 'today', 'tomorrow'} + for val in [val for val in date_range + if val != '' and val not in legal_day_names + and not cls.date_valid(val)]: + raise HandledException(f'Provided date is of wrong format: {val}') + start_str = date_range[0] if date_range[0] else '2024-01-01' + end_str = date_range[1] if date_range[1] else '2030-12-31' + start_date = getattr(cls, f'{start_str}s_date')() if start_str in legal_day_names else start_str + end_date = getattr(cls, f'{end_str}s_date')() if end_str in legal_day_names else end_str + if start_date > end_date: + temp = end_date + end_date = start_date + start_date = temp + days = [] + for row in db_conn.exec('SELECT * FROM days WHERE date >= ? AND date <= ?', + (start_date, end_date)): + days += [cls.from_row(row)] + if ensure_betweens: + if '' != date_range[0] and not start_date in [d.date for d in days]: + days += [cls(start_date)] + if '' != date_range[1] and not end_date in [d.date for d in days]: + days += [cls(end_date)] + days.sort() + if ensure_betweens and len(days) > 1: + gapless_days = [] + for i, day in enumerate(days): + gapless_days += [day] + if i < len(days) - 1: + while day.next_date != days[i+1].date: + day = Day(day.next_date) + gapless_days += [day] + days = gapless_days + return days + + @classmethod + def by_date(cls, db_conn, date, make_if_none=False): + for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)): + return cls.from_row(row) + return cls(date) if make_if_none else None + + def save(self, db_conn): + db_conn.exec('REPLACE INTO days VALUES (?, ?)', (self.date, self.comment)) + + @classmethod + def date_valid(cls, date): + try: + result = datetime.strptime(date, DATE_FORMAT) + except ValueError: + return None + return result + + @classmethod + def todays_date(cls, with_time=False): + cut_length = 19 if with_time else 10 + return str(datetime.now())[:cut_length] + + @classmethod + def yesterdays_date(cls): + return str(datetime.now() - timedelta(days=1))[:10] + + @classmethod + def tomorrows_date(cls): + return str(datetime.now() + timedelta(days=1))[:10] + + @property + def next_date(self): + next_datetime = self.datetime + timedelta(days=1) + return next_datetime.strftime(DATE_FORMAT) + + @property + def prev_date(self): + prev_datetime = self.datetime - timedelta(days=1) + return prev_datetime.strftime(DATE_FORMAT) + + @property + def weekday(self): + return self.datetime.strftime('%A') + + def __eq__(self, other): + return self.date == other.date + + def __lt__(self, other): + return self.date < other.date + + + +class TodoTemplate: + + def __init__(self, db_conn, id_): + self.id_ = id_ + self.forked_from = None + self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED') + self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0) + self.description = VersionedAttribute(db_conn, self, 'description', '') + + @classmethod + def from_row(cls, db_conn, row): + return cls(db_conn, row[0]) + + @classmethod + def all(cls, db_conn): + tmpls = [] + for row in db_conn.exec('SELECT * FROM templates'): + tmpls += [cls.from_row(db_conn, row)] + return tmpls + + @classmethod + def by_id(cls, db_conn, id_, make_if_none=False): + for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)): + return cls.from_row(db_conn, row) + if make_if_none: + return cls(db_conn, id_) + return None + + def save(self, db_conn): + cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,)) + if self.id_ is None: + self.id_ = cursor.fetchone()[0] + self.title.save(db_conn) + self.default_effort.save(db_conn) + self.description.save(db_conn) + + + +######## web stuff ############ + +class ParamsParser: + + def __init__(self, url_query, site_cookie): + self.params = parse_qs(url_query, keep_blank_values=True) + self.cookie = site_cookie + + def get(self, key, default): + return self.params.get(key, [default])[0] + + def get_cookied(self, key, default): + val = self.params.get(key, [self.cookie.get(key, default)])[0] + self.cookie[key] = val + return val + + + +class CookieDB: + + def __init__(self, site, headers): + from http.cookies import SimpleCookie + from json import loads as json_loads + self.site = site + self.of_site = {} + self.full = SimpleCookie(headers['Cookie']) if 'Cookie' in headers.keys() else SimpleCookie() + if site in self.full.keys(): + self.of_site = json_loads(self.full[site].value) + + def send_headers(self): + from json import dumps as json_dumps + self.full[self.site] = json_dumps(self.of_site) + return [('Set-Cookie', morsel.OutputString()) for morsel in self.full.values()] + + def reset(self): + for morsel in self.full.values(): + morsel['expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT' + + + +class TodoHandler(BaseHTTPRequestHandler): + + def init(self): + from os.path import split as path_split + from urllib.parse import urlparse + parsed_url = urlparse(self.path) + self.site = path_split(parsed_url.path)[1] + if 0 == len(self.site): + self.site = 'calendar' + self.cookie = CookieDB(self.site, self.headers) + self.params = ParamsParser(parsed_url.query, self.cookie.of_site) + self.db_conn = TodoDBConnection(self.server.db_file) + + def send_code_and_headers(self, code, headers, set_cookies=True): + self.send_response(code) + if set_cookies: + [self.send_header(h[0], h[1]) for h in self.cookie.send_headers()] + for fieldname, content in headers: + self.send_header(fieldname, content) + self.end_headers() + + def redirect(self, url): + self.send_code_and_headers(302, [('Location', url)]) + + def send_HTML(self, html, code=200): + self.send_code_and_headers(code, [('Content-type', 'text/html')]) + self.wfile.write(bytes(html, 'utf-8')) + + def send_fail(self, msg, code=400): + html = self.server.html.get_template('msg.html').render(msg=f'Exception: {msg}') + self.send_code_and_headers(code, [('Content-type', 'text/html')], set_cookies=False) + self.wfile.write(bytes(html, 'utf-8')) + + # POST routes + + def do_POST(self): + try: + self.init() + length = int(self.headers['content-length']) + self.redir_url = '/' + self.postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1) + if self.site in {'day', 'template'}: + getattr(self, f'do_POST_{self.site}')() + self.db_conn.commit() + self.db_conn.close() + self.redirect(self.redir_url) + except HandledException as msg: + self.send_fail(msg) + + def do_POST_day(self): + date = self.postvars['date'][0] + day = Day(date, self.postvars['comment'][0]) + day.save(self.db_conn) + + def do_POST_template(self): + id_ = self.params.get('id', None) + tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True) + tmpl.title.set(self.postvars['title'][0]) + tmpl.default_effort.set(float(self.postvars['default_effort'][0])) + tmpl.description.set(self.postvars['description'][0]) + tmpl.save(self.db_conn) + self.redir_url = 'templates' + + # GET routes + + def do_GET(self): + try: + self.init() + if self.site in {'day', 'template', 'templates', 'reset_cookie', 'calendar'}: + page = getattr(self, f'do_GET_{self.site}')() + else: + page = self.do_GET_calendar() + self.db_conn.close() + self.send_HTML(page) + except HandledException as msg: + self.send_fail(msg) + + def do_GET_reset_cookie(self): + self.cookie.reset() + msg = 'Cookie has been re-set!' + return self.server.html.get_template('msg.html').render(msg=msg) + + def do_GET_day(self): + date = self.params.get_cookied('id', Day.todays_date()) + day = Day.by_date(self.db_conn, date, make_if_none=True) + return self.server.html.get_template('day.html').render(day=day) + + def do_GET_calendar(self): + from_ = self.params.get_cookied('from', 'yesterday') + to = self.params.get_cookied('to', '') + days = Day.all(self.db_conn, ensure_betweens=True, date_range=(from_, to)) + return self.server.html.get_template('calendar.html').render( + days=days, from_=from_, to=to) + + def do_GET_template(self): + id_ = self.params.get('id', None) + template = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True) + return self.server.html.get_template('template.html').render(tmpl=template) + + def do_GET_templates(self): + templates = TodoTemplate.all(self.db_conn) + return self.server.html.get_template('templates.html').render(templates=templates) + + + +def main(): + from http.server import HTTPServer + from os import environ + from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader + path_todo_db = environ.get('TODO_DB') + if not path_todo_db: + raise HandledException('TODO_DB environment variable not set.') + db_file = TodoDBFile(path_todo_db) + server = HTTPServer(('localhost', HTTP_PORT), TodoHandler) + server.db_file = db_file + server.html = JinjaEnv(loader=JinjaFSLoader(HTML_DIR)) + print(f'running at http://localhost:{HTTP_PORT}') + try: + server.serve_forever() + except KeyboardInterrupt: + print('\ncaught KeyboardInterrupt, stopping server') + server.server_close() + + + +if __name__ == '__main__': + try: + main() + except HandledException as e: + e.nice_exit()