--- /dev/null
+#!/usr/bin/env python3
+from sqlite3 import connect as sql_connect
+from http.server import BaseHTTPRequestHandler
+from urllib.parse import parse_qs
+from datetime import datetime, timedelta
+
+
+
+PATH_DB_SCHEMA='init.sql'
+HTTP_PORT=8082
+HTML_DIR='html'
+
+
+
+DATE_FORMAT = '%Y-%m-%d'
+
+
+
+######## basic system stuff ############
+
+class HandledException(Exception):
+
+ def nice_exit(self):
+ from sys import exit as sys_exit
+ print(f'ABORTING: {self}')
+ sys_exit(1)
+
+
+
+class TodoDBFile:
+
+ def __init__(self, path):
+ from os.path import isfile
+ self.path = path
+ if not isfile(self.path):
+ self.make_new_if_wanted_else_abort()
+ self.validate_schema()
+
+ def make_new_if_wanted_else_abort(self):
+ create_question = f'Database file not found: {self.path}. Create? Y/n\n'
+ msg_on_no = 'Interpreting reply as "no", but cannot run without database file.'
+ legal_yesses = {'y', 'yes'}
+ create_reply = input(create_question)
+ if not create_reply.lower() in legal_yesses:
+ raise HandledException(msg_on_no)
+ with sql_connect(self.path) as conn:
+ with open(PATH_DB_SCHEMA, 'r') as f:
+ conn.executescript(f.read())
+
+ def validate_schema(self):
+ sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
+ msg_wrong_schema = 'Database has wrong tables schema. Diff:\n'
+ with sql_connect(self.path) as conn:
+ schema = ';\n'.join([r[0] for r in conn.execute(sql_for_schema) if r[0]]) + ';'
+ with open(PATH_DB_SCHEMA, 'r') as f:
+ stored_schema = f.read().rstrip()
+ if schema != stored_schema:
+ from difflib import Differ
+ d = Differ()
+ diff_msg = d.compare(schema.splitlines(), stored_schema.splitlines())
+ raise HandledException(msg_wrong_schema + '\n'.join(diff_msg))
+
+ def backup(self):
+ from shutil import copy2
+ from random import randint
+ copy2(self.path, f'{self.path}.bak.0')
+ for i in range(0, 6):
+ if 1 == randint(0, 2**i):
+ copy2(self.path, f'{self.path}.bak.{i+1}')
+ break
+
+
+
+class TodoDBConnection:
+
+ def __init__(self, db_file):
+ self.file = db_file
+ self.conn = sql_connect(self.file.path)
+
+ def commit(self):
+ self.file.backup()
+ self.conn.commit()
+
+ def exec(self, code, inputs=None):
+ if not inputs:
+ inputs = []
+ return self.conn.execute(code, inputs)
+
+ def close(self):
+ self.conn.close()
+
+
+
+######## models ############
+
+class VersionedAttribute:
+
+ def __init__(self, db_conn, parent, name, default):
+ self.parent = parent
+ self.name = name
+ self.default = default
+ self.history = {}
+ for row in db_conn.exec(f'SELECT * FROM {self.table_name} WHERE template = ?',
+ (self.parent.id_,)):
+ self.history[row[1]] = row[2]
+
+ @property
+ def table_name(self):
+ return f'versioned_{self.name}s'
+
+ def save(self, db_conn):
+ for date, value in self.history.items():
+ db_conn.exec(f'REPLACE INTO {self.table_name} VALUES (?, ?, ?)',
+ (self.parent.id_, date, value))
+
+ @property
+ def newest_date(self):
+ return sorted(self.history.keys())[-1]
+
+ @property
+ def newest(self):
+ if 0 == len(self.history):
+ if self.parent.forked_from:
+ return getattr(self.parent.forked_from, self.name).newest
+ return self.default
+ return self.history[self.newest_date]
+
+ def set(self, value):
+ if 0 == len(self.history) or value != self.history[self.newest_date]:
+ self.history[Day.todays_date(with_time=True)] = value
+
+
+
+class Day:
+
+ def __init__(self, date, comment=''):
+ self.date = date
+ self.datetime = self.__class__.date_valid(self.date)
+ if not self.datetime:
+ raise HandledException(f'Provided date is of wrong format: {date}')
+ self.comment = comment
+
+ @classmethod
+ def from_row(cls, row):
+ return cls(row[0], row[1])
+
+ @classmethod
+ def all(cls, db_conn, ensure_betweens=False, date_range=('', '')):
+ legal_day_names = {'yesterday', 'today', 'tomorrow'}
+ for val in [val for val in date_range
+ if val != '' and val not in legal_day_names
+ and not cls.date_valid(val)]:
+ raise HandledException(f'Provided date is of wrong format: {val}')
+ start_str = date_range[0] if date_range[0] else '2024-01-01'
+ end_str = date_range[1] if date_range[1] else '2030-12-31'
+ start_date = getattr(cls, f'{start_str}s_date')() if start_str in legal_day_names else start_str
+ end_date = getattr(cls, f'{end_str}s_date')() if end_str in legal_day_names else end_str
+ if start_date > end_date:
+ temp = end_date
+ end_date = start_date
+ start_date = temp
+ days = []
+ for row in db_conn.exec('SELECT * FROM days WHERE date >= ? AND date <= ?',
+ (start_date, end_date)):
+ days += [cls.from_row(row)]
+ if ensure_betweens:
+ if '' != date_range[0] and not start_date in [d.date for d in days]:
+ days += [cls(start_date)]
+ if '' != date_range[1] and not end_date in [d.date for d in days]:
+ days += [cls(end_date)]
+ days.sort()
+ if ensure_betweens and len(days) > 1:
+ gapless_days = []
+ for i, day in enumerate(days):
+ gapless_days += [day]
+ if i < len(days) - 1:
+ while day.next_date != days[i+1].date:
+ day = Day(day.next_date)
+ gapless_days += [day]
+ days = gapless_days
+ return days
+
+ @classmethod
+ def by_date(cls, db_conn, date, make_if_none=False):
+ for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)):
+ return cls.from_row(row)
+ return cls(date) if make_if_none else None
+
+ def save(self, db_conn):
+ db_conn.exec('REPLACE INTO days VALUES (?, ?)', (self.date, self.comment))
+
+ @classmethod
+ def date_valid(cls, date):
+ try:
+ result = datetime.strptime(date, DATE_FORMAT)
+ except ValueError:
+ return None
+ return result
+
+ @classmethod
+ def todays_date(cls, with_time=False):
+ cut_length = 19 if with_time else 10
+ return str(datetime.now())[:cut_length]
+
+ @classmethod
+ def yesterdays_date(cls):
+ return str(datetime.now() - timedelta(days=1))[:10]
+
+ @classmethod
+ def tomorrows_date(cls):
+ return str(datetime.now() + timedelta(days=1))[:10]
+
+ @property
+ def next_date(self):
+ next_datetime = self.datetime + timedelta(days=1)
+ return next_datetime.strftime(DATE_FORMAT)
+
+ @property
+ def prev_date(self):
+ prev_datetime = self.datetime - timedelta(days=1)
+ return prev_datetime.strftime(DATE_FORMAT)
+
+ @property
+ def weekday(self):
+ return self.datetime.strftime('%A')
+
+ def __eq__(self, other):
+ return self.date == other.date
+
+ def __lt__(self, other):
+ return self.date < other.date
+
+
+
+class TodoTemplate:
+
+ def __init__(self, db_conn, id_):
+ self.id_ = id_
+ self.forked_from = None
+ self.title = VersionedAttribute(db_conn, self, 'title', 'UNNAMED')
+ self.default_effort = VersionedAttribute(db_conn, self, 'default_effort', 1.0)
+ self.description = VersionedAttribute(db_conn, self, 'description', '')
+
+ @classmethod
+ def from_row(cls, db_conn, row):
+ return cls(db_conn, row[0])
+
+ @classmethod
+ def all(cls, db_conn):
+ tmpls = []
+ for row in db_conn.exec('SELECT * FROM templates'):
+ tmpls += [cls.from_row(db_conn, row)]
+ return tmpls
+
+ @classmethod
+ def by_id(cls, db_conn, id_, make_if_none=False):
+ for row in db_conn.exec('SELECT * FROM templates WHERE id = ?', (id_,)):
+ return cls.from_row(db_conn, row)
+ if make_if_none:
+ return cls(db_conn, id_)
+ return None
+
+ def save(self, db_conn):
+ cursor = db_conn.exec('REPLACE INTO templates VALUES (?) RETURNING ID', (self.id_,))
+ if self.id_ is None:
+ self.id_ = cursor.fetchone()[0]
+ self.title.save(db_conn)
+ self.default_effort.save(db_conn)
+ self.description.save(db_conn)
+
+
+
+######## web stuff ############
+
+class ParamsParser:
+
+ def __init__(self, url_query, site_cookie):
+ self.params = parse_qs(url_query, keep_blank_values=True)
+ self.cookie = site_cookie
+
+ def get(self, key, default):
+ return self.params.get(key, [default])[0]
+
+ def get_cookied(self, key, default):
+ val = self.params.get(key, [self.cookie.get(key, default)])[0]
+ self.cookie[key] = val
+ return val
+
+
+
+class CookieDB:
+
+ def __init__(self, site, headers):
+ from http.cookies import SimpleCookie
+ from json import loads as json_loads
+ self.site = site
+ self.of_site = {}
+ self.full = SimpleCookie(headers['Cookie']) if 'Cookie' in headers.keys() else SimpleCookie()
+ if site in self.full.keys():
+ self.of_site = json_loads(self.full[site].value)
+
+ def send_headers(self):
+ from json import dumps as json_dumps
+ self.full[self.site] = json_dumps(self.of_site)
+ return [('Set-Cookie', morsel.OutputString()) for morsel in self.full.values()]
+
+ def reset(self):
+ for morsel in self.full.values():
+ morsel['expires'] = 'Thu, 01 Jan 1970 00:00:00 GMT'
+
+
+
+class TodoHandler(BaseHTTPRequestHandler):
+
+ def init(self):
+ from os.path import split as path_split
+ from urllib.parse import urlparse
+ parsed_url = urlparse(self.path)
+ self.site = path_split(parsed_url.path)[1]
+ if 0 == len(self.site):
+ self.site = 'calendar'
+ self.cookie = CookieDB(self.site, self.headers)
+ self.params = ParamsParser(parsed_url.query, self.cookie.of_site)
+ self.db_conn = TodoDBConnection(self.server.db_file)
+
+ def send_code_and_headers(self, code, headers, set_cookies=True):
+ self.send_response(code)
+ if set_cookies:
+ [self.send_header(h[0], h[1]) for h in self.cookie.send_headers()]
+ for fieldname, content in headers:
+ self.send_header(fieldname, content)
+ self.end_headers()
+
+ def redirect(self, url):
+ self.send_code_and_headers(302, [('Location', url)])
+
+ def send_HTML(self, html, code=200):
+ self.send_code_and_headers(code, [('Content-type', 'text/html')])
+ self.wfile.write(bytes(html, 'utf-8'))
+
+ def send_fail(self, msg, code=400):
+ html = self.server.html.get_template('msg.html').render(msg=f'Exception: {msg}')
+ self.send_code_and_headers(code, [('Content-type', 'text/html')], set_cookies=False)
+ self.wfile.write(bytes(html, 'utf-8'))
+
+ # POST routes
+
+ def do_POST(self):
+ try:
+ self.init()
+ length = int(self.headers['content-length'])
+ self.redir_url = '/'
+ self.postvars = parse_qs(self.rfile.read(length).decode(), keep_blank_values=1)
+ if self.site in {'day', 'template'}:
+ getattr(self, f'do_POST_{self.site}')()
+ self.db_conn.commit()
+ self.db_conn.close()
+ self.redirect(self.redir_url)
+ except HandledException as msg:
+ self.send_fail(msg)
+
+ def do_POST_day(self):
+ date = self.postvars['date'][0]
+ day = Day(date, self.postvars['comment'][0])
+ day.save(self.db_conn)
+
+ def do_POST_template(self):
+ id_ = self.params.get('id', None)
+ tmpl = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+ tmpl.title.set(self.postvars['title'][0])
+ tmpl.default_effort.set(float(self.postvars['default_effort'][0]))
+ tmpl.description.set(self.postvars['description'][0])
+ tmpl.save(self.db_conn)
+ self.redir_url = 'templates'
+
+ # GET routes
+
+ def do_GET(self):
+ try:
+ self.init()
+ if self.site in {'day', 'template', 'templates', 'reset_cookie', 'calendar'}:
+ page = getattr(self, f'do_GET_{self.site}')()
+ else:
+ page = self.do_GET_calendar()
+ self.db_conn.close()
+ self.send_HTML(page)
+ except HandledException as msg:
+ self.send_fail(msg)
+
+ def do_GET_reset_cookie(self):
+ self.cookie.reset()
+ msg = 'Cookie has been re-set!'
+ return self.server.html.get_template('msg.html').render(msg=msg)
+
+ def do_GET_day(self):
+ date = self.params.get_cookied('id', Day.todays_date())
+ day = Day.by_date(self.db_conn, date, make_if_none=True)
+ return self.server.html.get_template('day.html').render(day=day)
+
+ def do_GET_calendar(self):
+ from_ = self.params.get_cookied('from', 'yesterday')
+ to = self.params.get_cookied('to', '')
+ days = Day.all(self.db_conn, ensure_betweens=True, date_range=(from_, to))
+ return self.server.html.get_template('calendar.html').render(
+ days=days, from_=from_, to=to)
+
+ def do_GET_template(self):
+ id_ = self.params.get('id', None)
+ template = TodoTemplate.by_id(self.db_conn, id_, make_if_none=True)
+ return self.server.html.get_template('template.html').render(tmpl=template)
+
+ def do_GET_templates(self):
+ templates = TodoTemplate.all(self.db_conn)
+ return self.server.html.get_template('templates.html').render(templates=templates)
+
+
+
+def main():
+ from http.server import HTTPServer
+ from os import environ
+ from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
+ path_todo_db = environ.get('TODO_DB')
+ if not path_todo_db:
+ raise HandledException('TODO_DB environment variable not set.')
+ db_file = TodoDBFile(path_todo_db)
+ server = HTTPServer(('localhost', HTTP_PORT), TodoHandler)
+ server.db_file = db_file
+ server.html = JinjaEnv(loader=JinjaFSLoader(HTML_DIR))
+ print(f'running at http://localhost:{HTTP_PORT}')
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ print('\ncaught KeyboardInterrupt, stopping server')
+ server.server_close()
+
+
+
+if __name__ == '__main__':
+ try:
+ main()
+ except HandledException as e:
+ e.nice_exit()
class AttributeWithHistory:
- def __init__(self, parent, name, default_if_empty, history=None, then_date='2000-01-01', set_check=None):
+ def __init__(self, parent, name, default_if_empty, history=None, then_date='2000-01-01'):
self.parent = parent
self.name = name
self.default = default_if_empty
self.then_date = then_date
self.history = history if history else {}
- self.set_check = set_check
def set(self, value):
- if self.set_check:
- self.set_check(value)
keys = sorted(self.history.keys())
if len(self.history) == 0 or value != self.history[keys[-1]]:
self.history[today_date(with_time=True)] = value
self.db = db
self.id_ = id_
self.comment = comment
- self.dependers = []
@property
def visible(self):
@property
def deps_chain(self):
all_deps = set()
- def add_deps(todo, all_deps):
- for dep in todo.deps:
+ def add_deps(node, all_deps):
+ for dep in node.deps:
all_deps.add(dep)
add_deps(dep, all_deps)
add_deps(self, all_deps)
chain.sort(key=lambda t: t.deps_depth)
return chain
+ def loop_check(self):
+ loop_msg = "can't set dep, would create loop"
+ def f(node, id_):
+ for dep in node.deps:
+ if id_ == dep.id_:
+ raise PlomException(loop_msg)
+ f(dep, id_)
+ f(self, self.id_)
+
+
class Task(TaskLike):
tags_history=None,
default_effort_history=None,
dep_ids_history=None,
+ adoptivity_history=None,
comment='',
forks_id=None):
super().__init__(db, id_, comment)
self.title = AttributeWithHistory(self, 'title', '', title_history, self.db.selected_date)
self.tags = AttributeWithHistory(self, 'tags', set(), tags_history, self.db.selected_date)
self.default_effort = AttributeWithHistory(self, 'default_effort', 0.0, default_effort_history, self.db.selected_date)
- self.dep_ids = AttributeWithHistory(self, 'dep_ids', set(), dep_ids_history, self.db.selected_date, set_check=self.dep_loop_checker())
+ self.dep_ids = AttributeWithHistory(self, 'dep_ids', set(), dep_ids_history, self.db.selected_date)
+ self.fences_adoptions = AttributeWithHistory(self, 'fences_adoptions', False, adoptivity_history, self.db.selected_date)
@classmethod
def from_dict(cls, db, d, id_):
{k: set(v) for k, v in d['tags_history'].items()},
d['default_effort_history'],
{k: set(v) for k, v in d['deps_history'].items()},
- d['comment'],
- d['forks'])
+ d['adoptivity_history'],
+ comment=d['comment'],
+ forks_id=d['forks'])
return t
def to_dict(self):
'default_effort_history': self.default_effort.history,
'tags_history': {k: list(v) for k, v in self.tags.history.items()},
'deps_history': {k: list(v) for k, v in self.dep_ids.history.items()},
+ 'adoptivity_history': self.fences_adoptions.history,
'comment': self.comment,
'forks': self.forks_id,
}
- # def fork(self):
- # now = today_date(with_time=True)
- # dict_source = {
- # 'title_history': {now: self},
- # 'default_effort_history': {now: self},
- # 'tags_history': {now: self},
- # 'deps_history': {now: self},
- # 'comment': self.comment,
- # 'forks': self.id_}
- # return self.db.add_task(dict_source=dict_source)
-
@property
def forked_task(self):
return self.db.tasks[self.forks_id] if self.forks_id else None
return sub_count
return count_weight(self)
- def dep_loop_checker(self):
- def f(dep_ids_now):
- loop_msg = "can't set dep, would create loop"
- for id_ in dep_ids_now:
- if id_ == self.id_:
- raise PlomException(loop_msg)
- elif id_ in self.db.tasks.keys():
- dep = self.db.tasks[id_]
- f(dep.dep_ids.now)
- return f
+ @property
+ def dependers(self):
+ return [t for t in self.db.tasks.values() if self in t.deps]
def matches(self, search):
if search is None:
self.efforts = efforts if efforts else {}
self.day_tags = day_tags if day_tags else set()
self.importance = importance
- self._dep_ids = dep_ids if dep_ids else []
+ self.dep_ids = dep_ids if dep_ids else set()
self.already_listed = False
self.been_observed = False
set(d['day_tags']),
d['importance'],
d['efforts'],
- d['deps'])
+ set(d['deps']))
return todo
def to_dict(self):
'day_tags': list(self.day_tags),
'importance': self.importance,
'efforts': self.efforts,
- 'deps': self._dep_ids}
+ 'deps': list(self.dep_ids)}
@property
def title(self):
@property
def deps(self):
- return [self.db.todos[id_] for id_ in self._dep_ids]
+ return [self.db.todos[id_] for id_ in self.dep_ids]
@deps.setter
def deps(self, deps):
- self._dep_ids = [dep.id_ for dep in deps]
- def f(deps):
- loop_msg = "can't set dep, would create loop"
- for dep in deps:
- if dep == self:
- raise PlomException(loop_msg)
- else:
- f(dep.deps)
- f(self.deps)
+ self.dep_ids = {dep.id_ for dep in deps}
@property
def default_effort(self):
def has_dependers(self):
return len(self.dependers) > 0
+ @property
+ def dependers(self):
+ return [t for t in self.db.todos.values() if self.id_ in t.dep_ids]
+
@property
def has_deps(self):
return len(self.deps) > 0
def family_effort(self):
return self.effort_at_selected_date + self.dep_efforts
+ def closest_adoptable(self, task_id):
+ print("DEBUG closest_adoptable", self.id_)
+ ranked_adoptables = {self.id_: 0}
+ def collect_downwards(node, ranked_adoptables, n_steps):
+ print("debug called collect_downwards", node.title, n_steps)
+ if not node.id_ in ranked_adoptables.keys():
+ ranked_adoptables[node.id_] = n_steps
+ for dep in node.deps:
+ collect_downwards(dep, ranked_adoptables, n_steps+1)
+ def collect_upwards(node, adoptables, n_steps):
+ print("debug called collect_upwards", node.title, n_steps)
+ collect_downwards(node, ranked_adoptables, n_steps+1)
+ if not node.task.fences_adoptions.now:
+ print("debug not blocking", node.dependers)
+ for depender in node.dependers:
+ collect_upwards(depender, ranked_adoptables, n_steps+1)
+ if len(node.dependers) == 0:
+ print("debug NO DEPENDERS, calling day")
+ for t in self.db.days[self.latest_date].linked_todos_as_list:
+ print("debug linked_todos_as_list:", t.title, t.id_)
+ collect_downwards(t, ranked_adoptables, n_steps)
+ else:
+ print("debug BLOCKING")
+ collect_upwards(self, ranked_adoptables, 0)
+ # for depender in self.dependers:
+ # collect_upwards(depender, ranked_adoptables, 0)
+ del ranked_adoptables[self.id_]
+ for id_, distance in sorted(ranked_adoptables.items(), key=lambda i:i[1], reverse=True):
+ if self.db.todos[id_].task.id_ == task_id:
+ return self.db.todos[id_]
+ return None
+
+
def observe(self):
self.been_observed = True
return ''
t = self.add_task(id_=id_, dict_source=t_dict)
for tag in t.tags.now:
self.t_tags.add(tag)
- for task in self.tasks.values():
- for dep in task.deps:
- dep.dependers += [task]
+ # for task in self.tasks.values():
+ # for dep in task.deps:
+ # dep.dependers += [task]
for id_, todo_dict in d['todos'].items():
todo = self.add_todo(todo_dict, id_)
for tag in todo.day_tags:
self.t_tags.add(tag)
- for todo in self.todos.values():
- for dep in todo.deps:
- dep.dependers += [todo]
+ # for todo in self.todos.values():
+ # for dep in todo.deps:
+ # dep.dependers += [todo]
for date, day_dict in d['days'].items():
self.add_day(dict_source=day_dict, date=date)
self.tasks[fork_id] = fork
return fork
- def update_task(self, id_, title, default_effort, tags, comment, dep_ids, depender_ids):
+ def update_task(self, id_, title, default_effort, tags, fences_adoptions, comment, dep_ids, depender_ids):
task = self.tasks[id_] if id_ in self.tasks.keys() else self.add_task(id_)
task.title.set(title)
task.default_effort.set(default_effort)
task.tags.set(tags)
- task.dep_ids.set(dep_ids)
+ task.fences_adoptions.set(fences_adoptions)
task.comment = comment
+ task.dep_ids.set(dep_ids)
for depender in task.dependers:
if not depender.id_ in depender_ids:
depender_dep_ids = depender.dep_ids.now
depender_dep_ids.remove(task.id_)
depender.dep_ids.set(depender_dep_ids)
for depender_id in depender_ids:
- depender= self.tasks[depender_id]
+ depender = self.tasks[depender_id]
depender_dep_ids = depender.dep_ids.now
depender_dep_ids.add(task.id_)
depender.dep_ids.set(depender_dep_ids)
+ task.loop_check()
return task
- def add_todo(self, todo_dict=None, id_=None, task=None, efforts=None, parenthood=''):
+ def add_todo(self, todo_dict=None, id_=None, task=None, efforts=None, parenthood='', force_new=False, creator=None):
make_children = parenthood != 'childless'
adopt_if_possible = parenthood == 'adoption'
id_ = id_ if id_ else str(uuid4())
if todo_dict:
todo = Todo.from_dict(self, todo_dict, id_)
+ self.todos[id_] = todo
elif task and efforts:
todo = Todo(self, id_, task, efforts=efforts)
- deps = []
+ print("DEBUG calling add_todo for", task.title.now, id_)
+ print("debug creator:", creator)
+ self.todos[id_] = todo
+ if creator:
+ creator.deps += [todo]
+ # deps = []
if make_children and todo.latest_date in self.days.keys():
for dep_task in task.deps:
# if Todo expects dependencies, adopt any compatibles found in DB.selected_date
# before creating new depended Todos
dep_todo = None
+ # if not force_new:
if adopt_if_possible:
- adoptable_todos = [t for t in self.days[todo.latest_date].linked_todos_as_list
- if t.task.id_ == dep_task.id_]
- if len(adoptable_todos) > 0:
- dep_todo = adoptable_todos[0]
+ print("debug trying adoption for", dep_task.title.now)
+ # adoptable_todos = [t for t in self.days[todo.latest_date].linked_todos_as_list
+ # if t.task.id_ == dep_task.id_]
+ # if len(adoptable_todos) > 0:
+ # dep_todo = adoptable_todos[0]
+ dep_todo = todo.closest_adoptable(dep_task.id_)
+ print("debug GOT ADOPTION", dep_todo)
if not dep_todo:
- dep_todo = self.add_todo(task=dep_task, efforts=efforts, parenthood=parenthood)
- deps += [dep_todo]
- todo.deps = deps
- self.todos[id_] = todo
+ # dep_todo = self.add_todo(task=dep_task, efforts=efforts, force_new)
+ print("debug no adoption for", dep_task.title.now, id_)
+ dep_todo = self.add_todo(task=dep_task, efforts=efforts, parenthood=parenthood, creator=todo)
+ todo.deps += [dep_todo]
+ # deps += [dep_todo]
+ # todo.deps = deps
for date in todo.efforts.keys():
if date in self.days.keys(): # possibly not all Days have been initialized yet
self.days[date].linked_todos_as_list += [todo]
todo = self._update_todo_shared(id_, done, comment, importance)
todo.efforts[date] = effort
- def update_todo(self, id_, efforts, done, comment, tags, importance, deps):
+ def update_todo(self, id_, efforts, done, comment, tags, importance, deps, depender_ids):
todo = self._update_todo_shared(id_, done, comment, importance)
if len(efforts) == 0 and not todo.deps:
raise PlomException('todo must have at least one effort!')
- todo.deps = deps
todo.efforts = efforts
for date in todo.efforts.keys():
if not date in self.days.keys():
if not self in self.days[date].linked_todos_as_list:
self.days[date].linked_todos_as_list += [todo]
todo.day_tags = tags
+ todo.deps = deps
+ for depender_id in depender_ids:
+ depender = self.todos[depender_id]
+ depender.dep_ids.add(todo.id_)
+ for depender in todo.dependers:
+ if not depender.id_ in depender_ids:
+ depender.dep_ids.remove(todo.id_)
+ todo.loop_check()
def delete_todo(self, id_):
todo = self.todos[id_]
for date in dates_to_delete:
self.delete_effort(todo, date, force=True)
for depender in todo.dependers:
- depender._dep_ids.remove(todo.id_)
+ depender.dep_ids.remove(todo.id_)
del self.todos[id_]
def delete_task(self, id_):
# views
- def show_message(self, message):
+ def get_message(self, message):
return j2env.get_template('message.html').render(message=message)
- def show_calendar_export(self, start_date_str, end_date_str):
+ def get_calendar_export(self, start_date_str, end_date_str):
days_to_show = self.init_calendar_items(start_date_str, end_date_str)
return j2env.get_template('calendar_export.html').render(days=days_to_show)
- def show_calendar(self, start_date_str, end_date_str):
- # self.tag_filter_and = ['calendar']
- # self.tag_filter_not = ['deleted']
-
- # todays_date_obj = datetime.strptime(today_date(), DATE_FORMAT)
- # yesterdays_date_obj = todays_date_obj - timedelta(1)
- # def get_day_limit_obj(index, day_limit_string):
- # date_obj = datetime.strptime(sorted(self.days.keys())[index], DATE_FORMAT)
- # if day_limit_string and len(day_limit_string) > 0:
- # if day_limit_string in {'today', 'yesterday'}:
- # date_obj = todays_date_obj if day_limit_string == 'today' else yesterdays_date_obj
- # else:
- # date_obj = datetime.strptime(day_limit_string, DATE_FORMAT)
- # return date_obj
- # start_date_obj = get_day_limit_obj(0, start_date_str)
- # end_date_obj = get_day_limit_obj(-1, end_date_str)
-
- # days_to_show = {}
- # for n in range(int((end_date_obj - start_date_obj).days) + 1):
- # date_obj = start_date_obj + timedelta(n)
- # date_str = date_obj.strftime(DATE_FORMAT)
- # if date_str not in self.days.keys():
- # days_to_show[date_str] = self.add_day(date_str)
- # else:
- # days_to_show[date_str] = self.days[date_str]
- # days_to_show[date_str].month_title = date_obj.strftime('%B') if date_obj.day == 1 else None
- # days_to_show[date_str].weekday = datetime.strptime(date_str, DATE_FORMAT).strftime('%A')[:2]
-
+ def get_calendar(self, start_date_str, end_date_str):
days_to_show = self.init_calendar_items(start_date_str, end_date_str)
return j2env.get_template('calendar.html').render(
selected_date=self.selected_date,
start_date=start_date_str,
end_date=end_date_str)
- def show_day_todos(self, undone_sort_order=None, done_sort_order=None, is_tree_shaped=False, todo_parenthood=None):
+ def get_day_todos(self, undone_sort_order=None, done_sort_order=None, is_tree_shaped=False, todo_parenthood=None):
legal_undone_sort_keys = {'title', 'sort_done', 'default_effort', 'importance'}
legal_done_sort_keys = {'title', 'effort_at_selected_date', 'family_effort'}
done_sort=done_sort_order,
parenthood=todo_parenthood)
- def show_todo(self, id_, parenthood):
+ def get_todo(self, id_, parenthood):
todo = self.todos[id_]
filtered_tasks = [t for t in self.tasks.values()
if t != todo.task]
parentood=parenthood,
dep_todos=todo.deps)
- def show_task(self, id_):
+ def get_task(self, id_):
if id_:
if not id_ in self.tasks.keys():
raise PlomException('no Task for ID')
filtered_tasks=filtered_tasks,
task=task)
- def show_tasks(self, search, sort_order=None):
+ def get_tasks(self, search, sort_order=None):
filtered_tasks = []
for task in [t for t in self.tasks.values() if (not search) or t.matches(search)]:
filtered_tasks += [task]
filter_not=self.tag_filter_not,
search=search)
+ # posts
+
+ def post_todo(self, id_, postvars, todo_parenthood):
+ if postvars.has('delete') and (not id_ in self.todos.keys()):
+ raise PlomException('can only do this on Todo that already exists')
+ old_todo = self.todos[id_] if id_ in self.todos.keys() else None
+ latest_date = self.selected_date
+ efforts = {}
+ for i, date in enumerate(postvars.get_all('effort_date', [])):
+ if '' == date:
+ continue
+ latest_date = date
+ efforts[date] = postvars.get_at_index('effort', i, float_if_possible=True)
+ if postvars.has('delete'):
+ raise PlomException('can only do this on Task that already exists')
+ has_day_effort = len([e for e in efforts.values() if e is not None]) > 0
+ if postvars.has('done')\
+ or postvars.get('comment')\
+ or postvars.get_all('tag', [])\
+ or has_day_effort:
+ raise PlomException('will not remove todo of preserve-worthy values')
+ self.delete_todo(id_)
+ return False
+ elif postvars.has('update'):
+ if postvars.has('delete_effort'):
+ for date in postvars.get_all('delete_effort'):
+ self.delete_effort(old_todo, date)
+ del efforts[date]
+ deps = [self.todos[adopt_id] for adopt_id in postvars.get_all('adopt_dep', [])
+ if adopt_id in self.todos.keys()]
+ birth_dep_ids = postvars.get_all('birth_dep', [])
+ for bad_id in [bad_id for bad_id in birth_dep_ids if not bad_id in self.tasks.keys()]:
+ raise PlomException('submitted illegal dep ID')
+ tasks_to_birth = [self.tasks[dep_id] for dep_id in birth_dep_ids]
+ for task in tasks_to_birth:
+ deps += [self.add_todo(task=task,
+ efforts={latest_date: None},
+ parenthood=todo_parenthood)]
+ depender_ids = postvars.get_all('depender', [])
+ self.update_todo(id_=id_,
+ efforts=efforts,
+ done=postvars.has('done'),
+ comment=postvars.get('comment', ''),
+ tags=postvars.get_all('tag', []),
+ importance=postvars.get('importance', float_if_possible=True),
+ deps=deps,
+ depender_ids=depender_ids)
+ return True
+
+ def post_task(self, id_, postvars):
+ if (postvars.has('delete') or postvars.has('fork')) and (not id_ in self.tasks.keys()):
+ raise PlomException('can only do this on Task that already exists')
+ if postvars.has('delete'):
+ if [t for t in self.todos.values() if id_ == t.task.id_]:
+ raise PlomException('will not remove Task describing existing Todos')
+ if postvars.get('title', '')\
+ or postvars.get_all('tag', [])\
+ or postvars.get_all('dep', [])\
+ or postvars.get('comment', ''):
+ raise PlomException('will not remove Task of preserve-worthy values')
+ self.delete_task(id_)
+ return None
+ elif postvars.has('update'):
+ dep_ids = postvars.get_all('dep', [])
+ for bad_id in [bad_id for bad_id in dep_ids if not bad_id in self.tasks.keys()]:
+ raise PlomException('submitted illegal dep ID')
+ depender_ids = postvars.get_all('depender', [])
+ for bad_id in [bad_id_ for bad_id in depender_ids if not bad_id in self.tasks.keys()]:
+ raise PlomException('submitted illegal dep ID')
+ task = self.update_task(
+ id_=id_,
+ title=postvars.get('title', ''),
+ default_effort=postvars.get('default_effort', float_if_possible=True),
+ tags=postvars.get_all('tag', []),
+ comment=postvars.get('comment', ''),
+ fences_adoptions=postvars.get('fences_adoptions', False),
+ dep_ids=dep_ids,
+ depender_ids=depender_ids)
+ if postvars.has('add_as_todo'):
+ self.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
+ elif postvars.has('fork'):
+ t = self.fork_task(id_)
+ return t.id_
+ return id_
+
+ def post_day_todos(self, postvars, todo_parenthood):
+ if not postvars.has('update'):
+ return
+ self.selected_date = postvars.get('date')
+ self.selected_day.comment = postvars.get('day_comment', '')
+ task_id = postvars.get('choose_task', None)
+ if task_id:
+ if task_id not in self.tasks.keys():
+ raise PlomException('illegal task ID entered')
+ self.add_todo(task=self.tasks[task_id], efforts={self.selected_date: None},
+ parenthood=todo_parenthood)
+ for todo_id in postvars.get_all('choose_todo', []):
+ self.todos[todo_id].efforts[self.selected_date] = None
+ for i, todo_id in enumerate(postvars.get_all('todo_id', [])):
+ old_todo = self.todos[todo_id]
+ done = todo_id in postvars.get_all('done', [])
+ day_effort_input = postvars.get_at_index('effort', i, '')
+ day_effort = float(day_effort_input) if len(day_effort_input) > 0 else None
+ comment = postvars.get_at_index('effort_comment', i, '')
+ if (day_effort is not None) and (not done) and day_effort < 0 and 0 == len(comment):
+ if len(old_todo.efforts) > 1:
+ self.delete_effort(old_todo, self.selected_date)
+ else:
+ self.delete_todo(todo_id)
+ continue
+ importance = float(postvars.get_at_index('importance', i))
+ if old_todo\
+ and old_todo.done == done\
+ and old_todo.day_effort == day_effort\
+ and comment == old_todo.comment\
+ and old_todo.importance == importance:
+ continue
+ self.update_todo_for_day(
+ todo_id,
+ self.selected_date,
+ day_effort,
+ done,
+ comment,
+ importance)
+
# helpers
def init_calendar_items(self, start_date_str, end_date_str):
self.cookie_db = cookie_db
def get(self, key, default=None, as_bool=False):
- # boolean = bool == type(default)
param = self.params.get(key, [default])[0]
if as_bool:
if param == '0':
def cookie_key_from_params_key(self, prefix, key):
return f'{prefix}:{key}' if prefix else key
- def get_cookied(self, key, default=None, prefix=None, as_bool=False):
- cookie_key = self.cookie_key_from_params_key(prefix, key)
- param = self.get(key, default, as_bool)
- if param == '-':
- param = None
- if cookie_key in self.cookie_db.keys():
- del self.cookie_db[cookie_key]
- if param is None and cookie_key in self.cookie_db.keys():
- param = self.cookie_db[cookie_key]
- if param is not None:
- self.cookie_db[cookie_key] = param
- return param
-
- def get_cookied_chain(self, key, default=None, prefix=None):
- cookie_key = self.cookie_key_from_params_key(prefix, key)
- params = self.params.get(key, default)
- if params == ['-']:
+ def _get_cookied(self, cookie_key, params):
+ if params in [['-'], '-']:
params = None
if cookie_key in self.cookie_db.keys():
del self.cookie_db[cookie_key]
self.cookie_db[cookie_key] = params
return params
+ def get_cookied(self, key, default=None, prefix=None, as_bool=False):
+ cookie_key = self.cookie_key_from_params_key(prefix, key)
+ param = self.get(key, default, as_bool)
+ return self._get_cookied(cookie_key, param)
+
+ def get_cookied_chain(self, key, default=None, prefix=None):
+ cookie_key = self.cookie_key_from_params_key(prefix, key)
+ params = self.params.get(key, default)
+ return self._get_cookied(cookie_key, params)
+
class PostvarsParser:
self.postvars[key] = [value]
+
class TodoHandler(PlomHandler):
def config_init(self):
db = TodoDB()
redir_params = []
# if we do encounter a filter post, we repost it (and if empty, the emptying command '-')
- for param_name, filter_db_name in {('and_tag', 'tag_filter_and'), ('not_tag', 'tag_filter_not')}:
+ for param_name, filter_db_name in {('and_tag', 'tag_filter_and'),
+ ('not_tag', 'tag_filter_not')}:
filter_db = getattr(db, filter_db_name)
if postvars.has(param_name):
for target in postvars.get_all(param_name, []):
if site in {'day_todos', 'todo'}:
todo_parenthood = postvars.get('parenthood')
redir_params += [('parenthood', todo_parenthood)]
- if postvars.has('filter'):
- postvars.set('return_to', '')
-
+ if site in {'todo', 'task'}:
+ id_ = postvars.get('id')
+
if 'tasks' == site:
redir_params += [('search', postvars.get('search', ''))]
-
elif 'todo' == site:
- todo_id = postvars.get('todo_id')
- redir_params += [('id', todo_id)]
- old_todo = db.todos[todo_id] if todo_id in db.todos.keys() else None
- efforts = {}
- latest_date = db.selected_date
- for i, date in enumerate(postvars.get_all('effort_date', [])):
- if '' == date:
- continue
- latest_date = date
- efforts[date] = None
- if not (old_todo and old_todo.deps):
- efforts[date] = postvars.get_at_index('effort', i, on_empty=None,
- float_if_possible=True)
- if postvars.has('delete'):
- has_day_effort = len([e for e in efforts.values() if e is not None]) > 0
- if postvars.has('done')\
- or postvars.get('comment')\
- or postvars.get_all('tag', [])\
- or has_day_effort:
- raise PlomException('will not remove todo of preserve-worthy values')
- db.delete_todo(todo_id)
- postvars.set('return_to', 'calendar')
- elif postvars.has('update'):
- if postvars.has('delete_effort'):
- for date in postvars.get_all('delete_effort'):
- db.delete_effort(old_todo, date)
- del efforts[date]
- deps = [db.todos[id_] for id_ in postvars.get_all('adopt_dep', [])
- if id_ in db.todos.keys()]
- for dep in deps:
- if not todo_id in [t.id_ for t in dep.dependers]:
- dep.dependers += [db.todos[todo_id]]
- birth_dep_ids = postvars.get_all('birth_dep', [])
- for id_ in [id_ for id_ in birth_dep_ids if not id_ in db.tasks.keys()]:
- raise PlomException('submitted illegal dep ID')
- tasks_to_birth = [db.tasks[id_] for id_ in birth_dep_ids]
- for task in tasks_to_birth:
- deps += [db.add_todo(task=task, efforts={latest_date: None}, parenthood=todo_parenthood)]
- db.update_todo(id_=todo_id,
- efforts=efforts,
- done=postvars.has('done'),
- comment=postvars.get('comment', ''),
- tags=postvars.get_all('tag', []),
- importance=float(postvars.get('importance')),
- deps=deps)
-
+ if db.post_todo(id_, postvars, todo_parenthood):
+ redir_params += [('id', id_)]
+ else:
+ site = 'calendar'
elif 'task' == site:
- task_id = postvars.get('task_id')
- if (postvars.has('delete') or postvars.has('fork')) and (not task_id in db.tasks.keys()):
- if not task_id in db.tasks.keys():
- raise PlomException('can only do this on Task that already exists')
- if postvars.has('delete'):
- if [t for t in db.todos.values() if task_id == t.task.id_]:
- raise PlomException('will not remove Task describing existing Todos')
- if postvars.get('title', '')\
- or postvars.get_all('tag', [])\
- or postvars.get_all('dep', [])\
- or postvars.get('comment', ''):
- raise PlomException('will not remove Task of preserve-worthy values')
- db.delete_task(task_id)
- elif postvars.has('update'):
- dep_ids = postvars.get_all('dep', [])
- for id_ in [id_ for id_ in dep_ids if not id_ in db.tasks.keys()]:
- raise PlomException('submitted illegal dep ID')
- depender_ids = postvars.get_all('depender', [])
- for id_ in [id_ for id_ in depender_ids if not id_ in db.tasks.keys()]:
- raise PlomException('submitted illegal dep ID')
- task = db.update_task(
- id_=task_id,
- title=postvars.get('title', ''),
- default_effort=postvars.get('default_effort', float_if_possible=True),
- tags=postvars.get_all('tag', []),
- comment=postvars.get('comment', ''),
- dep_ids=dep_ids,
- depender_ids=depender_ids)
- if postvars.has('add_as_todo'):
- db.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
- elif postvars.has('fork'):
- t = db.fork_task(task_id)
- task_id = t.id_
-
- # dep_ids = postvars.get_all('dep', [])
- # for id_ in [id_ for id_ in dep_ids if not id_ in db.tasks.keys()]:
- # raise PlomException('submitted illegal dep ID')
- # depender_ids = postvars.get_all('depender', [])
- # for id_ in [id_ for id_ in depender_ids if not id_ in db.tasks.keys()]:
- # raise PlomException('submitted illegal dep ID')
- # task = db.update_task(
- # id_=task_id,
- # title=postvars.get('title', ''),
- # default_effort=postvars.get('default_effort', float_if_possible=True),
- # tags=postvars.get_all('tag', []),
- # comment=postvars.get('comment', ''),
- # dep_ids=dep_ids,
- # depender_ids=depender_ids)
- # if postvars.has('add_as_todo'):
- # db.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
-
- redir_params += [('id', task_id)]
-
+ ret = db.post_task(id_, postvars)
+ if ret:
+ redir_params += [('id', ret)]
+ else:
+ site = 'calendar'
elif 'day_todos' == site:
- if postvars.has('update'):
- db.selected_date = postvars.get('date')
- redir_params += [('date', db.selected_date)]
- db.selected_day.comment = postvars.get('day_comment', '')
- task_id = postvars.get('choose_task', None)
- if task_id:
- if task_id not in db.tasks.keys():
- raise PlomException('illegal task ID entered')
- db.add_todo(task=db.tasks[task_id], efforts={db.selected_date: None},
- parenthood=todo_parenthood)
- for id_ in postvars.get_all('choose_todo', []):
- db.todos[id_].efforts[db.selected_date] = None
- for i, todo_id in enumerate(postvars.get_all('todo_id', [])):
- old_todo = db.todos[todo_id]
- done = todo_id in postvars.get_all('done', [])
- day_effort_input = postvars.get_at_index('effort', i, '')
- day_effort = float(day_effort_input) if len(day_effort_input) > 0 else None
- comment = postvars.get_at_index('effort_comment', i, '')
- if (day_effort is not None) and (not done) and day_effort < 0 and 0 == len(comment):
- if len(old_todo.efforts) > 1:
- db.delete_effort(old_todo, db.selected_date)
- else:
- db.delete_todo(todo_id)
- continue
- importance = float(postvars.get_at_index('importance', i))
- if old_todo\
- and old_todo.done == done\
- and old_todo.day_effort == day_effort\
- and comment == old_todo.comment\
- and old_todo.importance == importance:
- continue
- db.update_todo_for_day(
- todo_id,
- db.selected_date,
- day_effort,
- done,
- comment,
- importance)
-
- homepage = postvars.get('return_to')
- if not homepage:
- encoded_params = urlencode(redir_params)
- homepage = f'{site}?{encoded_params}'
+ db.post_day_todos(postvars, todo_parenthood)
+ redir_params += [('date', db.selected_date)]
+
+ encoded_params = urlencode(redir_params)
+ homepage = f'{site}?{encoded_params}'
db.write()
self.redirect(homepage)
config = self.apps['todo'] if hasattr(self, 'apps') else self.config_init()
parsed_url = urlparse(self.path)
site = path_split(parsed_url.path)[1]
+
cookie_db = self.get_cookie_db(config['cookie_name'])
params = ParamsParser(parsed_url.query, cookie_db)
-
selected_date = tag_filter_and = tag_filter_not = None
if site in {'day_todos', 'calendar', 'task'}:
selected_date = params.get_cookied('date')
if site in {'calendar', 'calendar_export', ''}:
start_date = params.get_cookied('start', prefix=site)
end_date = params.get_cookied('end', prefix=site)
- db = TodoDB(selected_date, tag_filter_and, tag_filter_not)
if site in {'todo', 'task'}:
id_ = params.get('id')
+
+ db = TodoDB(selected_date, tag_filter_and, tag_filter_not)
if 'reset_cookie' == site:
cookie_db = { # sensible defaults
params.cookie_key_from_params_key('day_todos', 'tree'): True,
params.cookie_key_from_params_key('todo', 'not_tag'): ['ignore'],
params.cookie_key_from_params_key('todo', 'start'): 'yesterday',
params.cookie_key_from_params_key('todo', 'end'): 'today'}
- page = db.show_message('cookie unset!')
+ page = db.get_message('cookie unset!')
elif 'day_todos' == site:
- is_tree_shaped = params.get_cookied('tree', prefix=site, as_bool=True)
+ tree_view = params.get_cookied('tree', prefix=site, as_bool=True)
undone_sort_order = params.get_cookied('undone_sort', prefix=site)
done_sort_order = params.get_cookied('done_sort', prefix=site)
- page = db.show_day_todos(undone_sort_order, done_sort_order, is_tree_shaped, todo_parenthood)
+ page = db.get_day_todos(undone_sort_order, done_sort_order, tree_view, todo_parenthood)
elif site == 'todo':
- page = db.show_todo(id_, parenthood=todo_parenthood)
+ page = db.get_todo(id_, parenthood=todo_parenthood)
elif 'task' == site:
- page = db.show_task(id_)
+ page = db.get_task(id_)
elif 'tasks' == site:
sort_order = params.get_cookied('sort', prefix=site)
search = params.get('search', '')
- page = db.show_tasks(search, sort_order)
+ page = db.get_tasks(search, sort_order)
elif 'add_task' == site:
- page = db.show_task(None)
+ page = db.get_task(None)
elif 'calendar_export' == site:
- page = db.show_calendar_export(start_date, end_date)
+ page = db.get_calendar_export(start_date, end_date)
else: # 'calendar' == site
- page = db.show_calendar(start_date, end_date)
+ page = db.get_calendar(start_date, end_date)
self.set_cookie(config['cookie_name'], config['cookie_path'], cookie_db)
self.send_HTML(page)
+
if __name__ == "__main__":
run_server(server_port, TodoHandler)