1 from plomlib import PlomDB, run_server, PlomHandler, PlomException
4 from datetime import datetime, timedelta
5 from urllib.parse import parse_qs
6 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
7 from urllib.parse import urlparse
8 from os.path import split as path_split
9 db_path = '/home/plom/org/todo_new.json'
11 DATE_FORMAT = '%Y-%m-%d'
12 j2env = JinjaEnv(loader=JinjaFSLoader('todo_templates'))
16 def today_date(with_time=False):
17 length = 19 if with_time else 10
18 return str(datetime.now())[:length]
22 class AttributeWithHistory:
24 def __init__(self, parent, name, default_if_empty, history=None, then_date='2000-01-01', set_check=None):
27 self.default = default_if_empty
28 self.then_date = then_date
29 self.history = history if history else {}
30 self.set_check = set_check
35 keys = sorted(self.history.keys())
36 if len(self.history) == 0 or value != self.history[keys[-1]]:
37 self.history[today_date(with_time=True)] = value
39 def at(self, queried_date):
40 end_of_queried_day = f'{queried_date} 23:59:59'
41 sorted_dates = sorted(self.history.keys())
42 if self.parent.forked_task and (0 == len(sorted_dates) or sorted_dates[0] > end_of_queried_day):
43 return getattr(self.parent.forked_task, self.name).at(queried_date)
44 elif 0 == len(sorted_dates):
46 ret = self.history[sorted_dates[0]]
47 for date_key, item in self.history.items():
48 if date_key > end_of_queried_day:
55 keys = sorted(self.history.keys())
56 if 0 == len(self.history):
57 if self.parent.forked_task:
58 return getattr(self.parent.forked_task, self.name).now
60 return self.history[keys[-1]]
64 return self.at(self.then_date)
70 def __init__(self, db, id_, comment):
73 self.comment = comment
78 visible_by_and = 0 == len([tag for tag in self.db.tag_filter_and
79 if not tag in self.tags])
80 visible_by_not = 0 == len([tag for tag in self.db.tag_filter_not
82 return visible_by_and and visible_by_not
86 if len(self.deps) == 0:
88 return 1 + max([d.deps_depth for d in self.deps])
93 def add_deps(todo, all_deps):
96 add_deps(dep, all_deps)
97 add_deps(self, all_deps)
98 chain = list(all_deps)
99 chain.sort(key=lambda t: t.deps_depth)
104 class Task(TaskLike):
111 default_effort_history=None,
112 dep_ids_history=None,
115 super().__init__(db, id_, comment)
116 self.forks_id = forks_id
117 self.title = AttributeWithHistory(self, 'title', '', title_history, self.db.selected_date)
118 self.tags = AttributeWithHistory(self, 'tags', set(), tags_history, self.db.selected_date)
119 self.default_effort = AttributeWithHistory(self, 'default_effort', 0.0, default_effort_history, self.db.selected_date)
120 self.dep_ids = AttributeWithHistory(self, 'dep_ids', set(), dep_ids_history, self.db.selected_date, set_check=self.dep_loop_checker())
123 def from_dict(cls, db, d, id_):
128 {k: set(v) for k, v in d['tags_history'].items()},
129 d['default_effort_history'],
130 {k: set(v) for k, v in d['deps_history'].items()},
137 'title_history': self.title.history,
138 'default_effort_history': self.default_effort.history,
139 'tags_history': {k: list(v) for k, v in self.tags.history.items()},
140 'deps_history': {k: list(v) for k, v in self.dep_ids.history.items()},
141 'comment': self.comment,
142 'forks': self.forks_id,
146 # now = today_date(with_time=True)
148 # 'title_history': {now: self},
149 # 'default_effort_history': {now: self},
150 # 'tags_history': {now: self},
151 # 'deps_history': {now: self},
152 # 'comment': self.comment,
154 # return self.db.add_task(dict_source=dict_source)
157 def forked_task(self):
158 return self.db.tasks[self.forks_id] if self.forks_id else None
163 for id_ in self.dep_ids.now:
166 elif id_ not in self.db.tasks.keys():
167 raise PlomException(f'dep referenced on Task {self.id_} not found: {id_}')
168 deps += [self.db.tasks[id_]]
172 def deps_weight(self):
173 def count_weight(task):
175 for dep in task.deps:
176 sub_count += count_weight(dep)
178 return count_weight(self)
180 def dep_loop_checker(self):
182 loop_msg = "can't set dep, would create loop"
183 for id_ in dep_ids_now:
185 raise PlomException(loop_msg)
186 elif id_ in self.db.tasks.keys():
187 dep = self.db.tasks[id_]
191 def matches(self, search):
195 return search in self.title.now or search in self.comment or search in '$'.join(self.tags.now)
198 def latest_effort_date(self):
199 todos = [t for t in self.db.todos.values() if t.task.id_ == self.id_]
200 todos.sort(key=lambda t: t.latest_date)
202 return todos[-1].latest_date
210 def __init__(self, db, date, comment=''):
213 self.comment = comment
214 self.linked_todos_as_list = []
217 def from_dict(cls, db, d, date=None):
218 comment = d['comment'] if 'comment' in d.keys() else ''
219 day = cls(db, date, comment)
223 d = {'comment': self.comment}
227 def linked_todos(self):
229 for todo in self.linked_todos_as_list:
230 linked_todos[todo.id_] = todo
233 def _todos_sum(self, include_undone=False):
235 for todo in [todo for todo in self.linked_todos.values()
236 if self.date in todo.efforts.keys()]:
237 day_effort = todo.efforts[self.date]
239 s += day_effort if day_effort else todo.task.default_effort.at(self.date)
241 s += day_effort if day_effort else 0
246 return self._todos_sum()
248 def sorted_todos(self, done, is_tree_shaped, sort_order, legal_keys):
249 todos = [t for t in self.linked_todos_as_list if t.visible and t.done == done]
251 sort_column = sort_order[:]
252 if sort_order and '-' == sort_order[0]:
254 sort_column = sort_order[1:]
255 if sort_column in legal_keys:
256 todos.sort(key=lambda t: getattr(t, sort_column))
260 def walk_tree(todo, sorted_todos):
261 todo.deps = [t for t in sorted_todos if t in todo.deps]
262 for dep in [t for t in sorted_todos if t in todo.deps]:
263 walk_tree(dep, sorted_todos)
265 trunk = [t for t in todos if len([td for td in t.dependers if td.done]) == 0]
267 trunk = [t for t in todos if len(t.dependers) == 0]
269 walk_tree(node, todos)
274 def visible_in_export(self):
275 return len(self.comment) + len([t for t in self.linked_todos_as_list if t.visible]) > 0
279 class Todo(TaskLike):
291 super().__init__(db, id_, comment)
294 for k in efforts.keys():
296 datetime.strptime(k, DATE_FORMAT)
298 raise PlomException(f'Todo creation: bad date string: {k}')
299 self.efforts = efforts if efforts else {}
300 self.day_tags = day_tags if day_tags else set()
301 self.importance = importance
302 self._dep_ids = dep_ids if dep_ids else []
303 self.already_listed = False
304 self.been_observed = False
307 def from_dict(cls, db, d, id_):
322 'task': self.task.id_,
324 'comment': self.comment,
325 'day_tags': list(self.day_tags),
326 'importance': self.importance,
327 'efforts': self.efforts,
328 'deps': self._dep_ids}
332 return self.task.title.at(self.db.selected_date)
335 def dated_title(self):
336 return f'{self.earliest_date}:{self.title}'
340 return [self.db.todos[id_] for id_ in self._dep_ids]
343 def deps(self, deps):
344 self._dep_ids = [dep.id_ for dep in deps]
346 loop_msg = "can't set dep, would create loop"
349 raise PlomException(loop_msg)
355 def default_effort(self):
356 return self.task.default_effort.at(self.earliest_date)
360 return self.deps_done and self._done
363 def done(self, doneness):
364 self._done = doneness
368 if len(self.deps) > 0:
369 for dep in self.deps:
375 def dep_efforts(self):
378 for dep in self.deps:
379 dep_efforts += dep.all_days_effort
380 dep_efforts += dep.dep_efforts
383 def depender_path(self, dep):
385 return f'Y{depender.depender_path()}:{self.title}Y'
387 return f'X{self.title}X'
390 def depender_paths(self):
392 for depender in self.dependers:
393 if len(depender.depender_paths) == 0:
394 paths += [[depender]]
396 for path in depender.depender_paths:
397 paths += [path + [depender]]
401 def shortened_depender_paths(self):
403 for path in self.depender_paths:
405 for i, step in enumerate(path):
406 next_step = None if len(path) <= i+1 else path[i+1]
407 for distinct_path in new_paths:
408 if len(distinct_path) >= i+2 and distinct_path[i+1] == next_step:
411 new_paths += [new_path]
415 def all_days_effort(self):
417 for effort in self.efforts.values():
418 total += effort if effort else 0
420 total = max(total, self.task.default_effort.at(self.latest_date))
423 def matches(self, search):
427 return search in self.comment or search in '$'.join(self.tags) or search in self.title
429 def is_effort_removable(self, date):
430 if not date in self.efforts.keys():
432 if self.efforts[date]:
434 if self.done and date == self.latest_date:
440 return self.day_tags | self.task.tags.now
443 def day_effort(self):
444 return self.efforts[self.db.selected_date]
447 def effort_at_selected_date(self):
448 if self.db.selected_date in self.efforts.keys() and self.day_effort is not None:
449 return self.day_effort
451 return self.task.default_effort.then
455 return self.db.days[self.earliest_date]
458 def sorted_effort_dates(self):
459 dates = list(self.efforts.keys())
464 def earliest_date(self):
465 return self.sorted_effort_dates[0]
468 def latest_date(self):
469 return self.sorted_effort_dates[-1]
472 def has_dependers(self):
473 return len(self.dependers) > 0
477 return len(self.deps) > 0
481 return self.day_effort if self.day_effort else 0
484 def family_effort(self):
485 return self.effort_at_selected_date + self.dep_efforts
488 self.been_observed = True
493 class TodoDB(PlomDB):
497 tag_filter_and = None,
498 tag_filter_not = None):
499 self.selected_date = selected_date if selected_date else today_date()
500 self.tag_filter_and = tag_filter_and if tag_filter_and else []
501 self.tag_filter_not = tag_filter_not if tag_filter_not else []
507 super().__init__(db_path)
511 def read_db_file(self, f):
514 for id_, t_dict in d['tasks'].items():
515 t = self.add_task(id_=id_, dict_source=t_dict)
516 for tag in t.tags.now:
518 for task in self.tasks.values():
519 for dep in task.deps:
520 dep.dependers += [task]
522 for id_, todo_dict in d['todos'].items():
523 todo = self.add_todo(todo_dict, id_)
524 for tag in todo.day_tags:
526 for todo in self.todos.values():
527 for dep in todo.deps:
528 dep.dependers += [todo]
530 for date, day_dict in d['days'].items():
531 self.add_day(dict_source=day_dict, date=date)
532 for todo in self.todos.values():
533 for date in todo.efforts.keys():
534 if not date in self.days.keys():
536 self.days[date].linked_todos_as_list += [todo]
538 # self.set_visibilities()
541 d = {'tasks': {}, 'days': {}, 'todos': {}}
542 for uuid, t in self.tasks.items():
543 d['tasks'][uuid] = t.to_dict()
544 for date, day in self.days.items():
545 d['days'][date] = day.to_dict()
546 for id_, todo in self.todos.items():
547 d['todos'][id_] = todo.to_dict()
552 for date, day in self.days.items():
553 if len(day.linked_todos) == 0 and len(day.comment) == 0:
554 dates_to_purge += [date]
555 for date in dates_to_purge:
557 self.write_text_to_db(json.dumps(self.to_dict()))
562 def selected_day(self):
563 if not self.selected_date in self.days.keys():
564 self.days[self.selected_date] = self.add_day(date=self.selected_date)
565 return self.days[self.selected_date]
567 # table manipulations
569 def add_day(self, date, dict_source=None):
570 day = Day.from_dict(self, dict_source, date) if dict_source else Day(self, date)
571 self.days[date] = day
574 def add_task(self, id_=None, dict_source=None):
575 id_ = id_ if id_ else str(uuid4())
576 t = Task.from_dict(self, dict_source, id_) if dict_source else Task(self, id_)
580 def fork_task(self, id_):
581 origin = self.tasks[id_]
582 now = today_date(with_time=True)
583 fork_id = str(uuid4())
584 fork = Task(self, fork_id, {}, {}, {}, {}, origin.comment, id_)
585 self.tasks[fork_id] = fork
588 def update_task(self, id_, title, default_effort, tags, comment, dep_ids, depender_ids):
589 task = self.tasks[id_] if id_ in self.tasks.keys() else self.add_task(id_)
590 task.title.set(title)
591 task.default_effort.set(default_effort)
593 task.dep_ids.set(dep_ids)
594 task.comment = comment
595 for depender in task.dependers:
596 if not depender.id_ in depender_ids:
597 depender_dep_ids = depender.dep_ids.now
598 depender_dep_ids.remove(task.id_)
599 depender.dep_ids.set(depender_dep_ids)
600 for depender_id in depender_ids:
601 depender= self.tasks[depender_id]
602 depender_dep_ids = depender.dep_ids.now
603 depender_dep_ids.add(task.id_)
604 depender.dep_ids.set(depender_dep_ids)
607 def add_todo(self, todo_dict=None, id_=None, task=None, efforts=None, parenthood=''):
608 make_children = parenthood != 'childless'
609 adopt_if_possible = parenthood == 'adoption'
610 id_ = id_ if id_ else str(uuid4())
612 todo = Todo.from_dict(self, todo_dict, id_)
613 elif task and efforts:
614 todo = Todo(self, id_, task, efforts=efforts)
616 if make_children and todo.latest_date in self.days.keys():
617 for dep_task in task.deps:
618 # if Todo expects dependencies, adopt any compatibles found in DB.selected_date
619 # before creating new depended Todos
621 if adopt_if_possible:
622 adoptable_todos = [t for t in self.days[todo.latest_date].linked_todos_as_list
623 if t.task.id_ == dep_task.id_]
624 if len(adoptable_todos) > 0:
625 dep_todo = adoptable_todos[0]
627 dep_todo = self.add_todo(task=dep_task, efforts=efforts, parenthood=parenthood)
630 self.todos[id_] = todo
631 for date in todo.efforts.keys():
632 if date in self.days.keys(): # possibly not all Days have been initialized yet
633 self.days[date].linked_todos_as_list += [todo]
636 def _update_todo_shared(self, id_, done, comment, importance):
637 todo = self.todos[id_]
639 todo.comment = comment
640 todo.importance = importance
643 def update_todo_for_day(self, id_, date, effort, done, comment, importance):
644 todo = self._update_todo_shared(id_, done, comment, importance)
645 todo.efforts[date] = effort
647 def update_todo(self, id_, efforts, done, comment, tags, importance, deps):
648 todo = self._update_todo_shared(id_, done, comment, importance)
649 if len(efforts) == 0 and not todo.deps:
650 raise PlomException('todo must have at least one effort!')
652 todo.efforts = efforts
653 for date in todo.efforts.keys():
654 if not date in self.days.keys():
655 self.add_day(date=date)
656 if not self in self.days[date].linked_todos_as_list:
657 self.days[date].linked_todos_as_list += [todo]
660 def delete_todo(self, id_):
661 todo = self.todos[id_]
663 for date in todo.efforts.keys():
664 dates_to_delete += [date]
665 for date in dates_to_delete:
666 self.delete_effort(todo, date, force=True)
667 for depender in todo.dependers:
668 depender._dep_ids.remove(todo.id_)
671 def delete_task(self, id_):
674 def delete_effort(self, todo, date, force=False):
675 if (not force) and len(todo.efforts) == 1:
676 raise PlomException('todo must retain at least one effort!')
677 self.days[date].linked_todos_as_list.remove(todo)
678 del todo.efforts[date]
682 def show_message(self, message):
683 return j2env.get_template('message.html').render(message=message)
685 def show_calendar_export(self, start_date_str, end_date_str):
686 days_to_show = self.init_calendar_items(start_date_str, end_date_str)
687 return j2env.get_template('calendar_export.html').render(days=days_to_show)
689 def show_calendar(self, start_date_str, end_date_str):
690 # self.tag_filter_and = ['calendar']
691 # self.tag_filter_not = ['deleted']
693 # todays_date_obj = datetime.strptime(today_date(), DATE_FORMAT)
694 # yesterdays_date_obj = todays_date_obj - timedelta(1)
695 # def get_day_limit_obj(index, day_limit_string):
696 # date_obj = datetime.strptime(sorted(self.days.keys())[index], DATE_FORMAT)
697 # if day_limit_string and len(day_limit_string) > 0:
698 # if day_limit_string in {'today', 'yesterday'}:
699 # date_obj = todays_date_obj if day_limit_string == 'today' else yesterdays_date_obj
701 # date_obj = datetime.strptime(day_limit_string, DATE_FORMAT)
703 # start_date_obj = get_day_limit_obj(0, start_date_str)
704 # end_date_obj = get_day_limit_obj(-1, end_date_str)
707 # for n in range(int((end_date_obj - start_date_obj).days) + 1):
708 # date_obj = start_date_obj + timedelta(n)
709 # date_str = date_obj.strftime(DATE_FORMAT)
710 # if date_str not in self.days.keys():
711 # days_to_show[date_str] = self.add_day(date_str)
713 # days_to_show[date_str] = self.days[date_str]
714 # days_to_show[date_str].month_title = date_obj.strftime('%B') if date_obj.day == 1 else None
715 # days_to_show[date_str].weekday = datetime.strptime(date_str, DATE_FORMAT).strftime('%A')[:2]
717 days_to_show = self.init_calendar_items(start_date_str, end_date_str)
718 return j2env.get_template('calendar.html').render(
719 selected_date=self.selected_date,
721 start_date=start_date_str,
722 end_date=end_date_str)
724 def show_day_todos(self, undone_sort_order=None, done_sort_order=None, is_tree_shaped=False, todo_parenthood=None):
725 legal_undone_sort_keys = {'title', 'sort_done', 'default_effort', 'importance'}
726 legal_done_sort_keys = {'title', 'effort_at_selected_date', 'family_effort'}
728 current_date = datetime.strptime(self.selected_date, DATE_FORMAT)
729 prev_date = current_date - timedelta(days=1)
730 prev_date_str = prev_date.strftime(DATE_FORMAT)
731 next_date = current_date + timedelta(days=1)
732 next_date_str = next_date.strftime(DATE_FORMAT)
734 adoptable_past_todos = []
735 for todo in [t for t in self.todos.values()
738 and t.earliest_date < self.selected_date]:
739 adoptable_past_todos += [todo]
740 undone_todos = self.selected_day.sorted_todos(False, is_tree_shaped, undone_sort_order,
741 legal_undone_sort_keys)
742 done_todos = self.selected_day.sorted_todos(True, is_tree_shaped, done_sort_order,
743 legal_done_sort_keys)
745 return j2env.get_template('day_todos.html').render(
746 day=self.selected_day,
748 filter_and=self.tag_filter_and,
749 filter_not=self.tag_filter_not,
750 prev_date=prev_date_str,
751 adoptable_past_todos=adoptable_past_todos,
752 next_date=next_date_str,
753 all_tasks=[t for t in self.tasks.values()],
754 undone_todos=undone_todos,
755 done_todos=done_todos,
756 is_tree_shaped=is_tree_shaped,
757 undone_sort=undone_sort_order,
758 done_sort=done_sort_order,
759 parenthood=todo_parenthood)
761 def show_todo(self, id_, parenthood):
762 todo = self.todos[id_]
763 filtered_tasks = [t for t in self.tasks.values()
765 filtered_todos = [t for t in self.todos.values()
767 and t not in todo.deps]
769 legal_dates = list(todo.efforts.keys())
770 date_filtered_todos = []
771 for date in legal_dates:
772 for filtered_todo in filtered_todos:
773 if filtered_todo in date_filtered_todos:
775 if date in filtered_todo.efforts.keys():
776 date_filtered_todos += [filtered_todo]
779 for dep in todo.task.deps:
780 dep_slots += [{'task': dep,
781 'todos': [t for t in todo.deps if t.task == dep]}]
784 for dep in todo.task.deps:
785 suggested_todos[dep.id_] = [t for t in date_filtered_todos if t.task.id_ == dep.id_]
786 additional_deps = [t for t in todo.deps if not t.task in todo.task.deps]
788 return j2env.get_template('todo.html').render(
791 filtered_todos=date_filtered_todos,
792 filtered_tasks=filtered_tasks,
794 suggested_todos=suggested_todos,
795 additional_deps=additional_deps,
796 parentood=parenthood,
799 def show_task(self, id_):
801 if not id_ in self.tasks.keys():
802 raise PlomException('no Task for ID')
803 task = self.tasks[id_]
805 task = self.add_task()
807 task.default_effort.set(1.0)
808 filtered_tasks = [t for t in self.tasks.values()
810 and (t not in task.deps)]
812 return j2env.get_template('task.html').render(
813 selected_date=self.selected_date,
815 filtered_tasks=filtered_tasks,
818 def show_tasks(self, search, sort_order=None):
820 for task in [t for t in self.tasks.values() if (not search) or t.matches(search)]:
821 filtered_tasks += [task]
823 sort_column = sort_order
824 if sort_order and '-' == sort_order[0]:
826 sort_column = sort_order[1:]
827 if sort_column == 'title':
828 filtered_tasks.sort(key=lambda t: t.title.now)
829 elif sort_column == 'default_effort':
830 filtered_tasks.sort(key=lambda t: t.default_effort.now)
831 elif sort_column == 'weight':
832 filtered_tasks.sort(key=lambda t: t.deps_weight)
833 elif sort_column == 'latest_effort_date':
834 filtered_tasks.sort(key=lambda t: t.latest_effort_date)
836 filtered_tasks.reverse()
837 return j2env.get_template('tasks.html').render(
839 tasks=filtered_tasks,
841 filter_and=self.tag_filter_and,
842 filter_not=self.tag_filter_not,
847 def init_calendar_items(self, start_date_str, end_date_str):
848 self.tag_filter_and = ['calendar']
849 self.tag_filter_not = ['deleted']
851 todays_date_obj = datetime.strptime(today_date(), DATE_FORMAT)
852 yesterdays_date_obj = todays_date_obj - timedelta(1)
853 def get_day_limit_obj(index, day_limit_string):
854 date_obj = datetime.strptime(sorted(self.days.keys())[index], DATE_FORMAT)
855 if day_limit_string and len(day_limit_string) > 0:
856 if day_limit_string in {'today', 'yesterday'}:
857 date_obj = todays_date_obj if day_limit_string == 'today' else yesterdays_date_obj
859 date_obj = datetime.strptime(day_limit_string, DATE_FORMAT)
861 start_date_obj = get_day_limit_obj(0, start_date_str)
862 end_date_obj = get_day_limit_obj(-1, end_date_str)
865 for n in range(int((end_date_obj - start_date_obj).days) + 1):
866 date_obj = start_date_obj + timedelta(n)
867 date_str = date_obj.strftime(DATE_FORMAT)
868 if date_str not in self.days.keys():
869 days_to_show[date_str] = self.add_day(date_str)
871 days_to_show[date_str] = self.days[date_str]
872 days_to_show[date_str].month_title = date_obj.strftime('%B') if date_obj.day == 1 else None
873 days_to_show[date_str].weekday = datetime.strptime(date_str, DATE_FORMAT).strftime('%A')[:2]
880 def __init__(self, parsed_url_query, cookie_db):
881 self.params = parse_qs(parsed_url_query)
882 self.cookie_db = cookie_db
884 def get(self, key, default=None, as_bool=False):
885 # boolean = bool == type(default)
886 param = self.params.get(key, [default])[0]
890 elif param is not None:
894 def cookie_key_from_params_key(self, prefix, key):
895 return f'{prefix}:{key}' if prefix else key
897 def get_cookied(self, key, default=None, prefix=None, as_bool=False):
898 cookie_key = self.cookie_key_from_params_key(prefix, key)
899 param = self.get(key, default, as_bool)
902 if cookie_key in self.cookie_db.keys():
903 del self.cookie_db[cookie_key]
904 if param is None and cookie_key in self.cookie_db.keys():
905 param = self.cookie_db[cookie_key]
906 if param is not None:
907 self.cookie_db[cookie_key] = param
910 def get_cookied_chain(self, key, default=None, prefix=None):
911 cookie_key = self.cookie_key_from_params_key(prefix, key)
912 params = self.params.get(key, default)
915 if cookie_key in self.cookie_db.keys():
916 del self.cookie_db[cookie_key]
917 if params is None and cookie_key in self.cookie_db.keys():
918 params = self.cookie_db[cookie_key]
919 if params is not None:
920 self.cookie_db[cookie_key] = params
925 class PostvarsParser:
927 def __init__(self, postvars):
928 self.postvars = postvars
931 return key in self.postvars.keys()
933 def get(self, key, on_empty=None, float_if_possible=False):
934 return self.get_at_index(key, 0, on_empty, float_if_possible)
936 def get_at_index(self, key, i, on_empty=None, float_if_possible=False):
937 if self.has(key) and len(self.postvars[key][i]) > 0:
938 val = self.postvars[key][i]
941 if float_if_possible and val is not None:
946 def get_all(self, key, on_empty=None):
947 if self.has(key) and len(self.postvars[key]) > 0:
948 return [v for v in self.postvars[key] if len(v) > 0]
951 def set(self, key, value):
952 self.postvars[key] = [value]
955 class TodoHandler(PlomHandler):
957 def config_init(self):
959 'cookie_name': 'todo_cookie',
963 def app_init(self, handler):
964 default_path = '/todo'
965 handler.add_route('GET', default_path, self.show_db)
966 handler.add_route('POST', default_path, self.write_db)
967 return 'todo', {'cookie_name': 'todo_cookie', 'cookie_path': default_path}
970 self.try_do(self.write_db)
973 from urllib.parse import urlencode
974 config = self.apps['todo'] if hasattr(self, 'apps') else self.config_init()
975 parsed_url = urlparse(self.path)
976 site = path_split(parsed_url.path)[1]
977 length = int(self.headers['content-length'])
978 postvars = PostvarsParser(parse_qs(self.rfile.read(length).decode(), keep_blank_values=1))
982 # if we do encounter a filter post, we repost it (and if empty, the emptying command '-')
983 for param_name, filter_db_name in {('and_tag', 'tag_filter_and'), ('not_tag', 'tag_filter_not')}:
984 filter_db = getattr(db, filter_db_name)
985 if postvars.has(param_name):
986 for target in postvars.get_all(param_name, []):
987 if len(target) > 0 and not target in filter_db:
988 filter_db += [target]
989 if len(filter_db) == 0:
990 redir_params += [(param_name, '-')]
991 redir_params += [(param_name, f) for f in filter_db]
992 if site in {'calendar', 'todo'}:
993 redir_params += [('end', postvars.get('end', '-'))]
994 redir_params += [('start', postvars.get('start', '-'))]
995 if site in {'day_todos', 'todo'}:
996 todo_parenthood = postvars.get('parenthood')
997 redir_params += [('parenthood', todo_parenthood)]
998 if postvars.has('filter'):
999 postvars.set('return_to', '')
1002 redir_params += [('search', postvars.get('search', ''))]
1004 elif 'todo' == site:
1005 todo_id = postvars.get('todo_id')
1006 redir_params += [('id', todo_id)]
1007 old_todo = db.todos[todo_id] if todo_id in db.todos.keys() else None
1009 latest_date = db.selected_date
1010 for i, date in enumerate(postvars.get_all('effort_date', [])):
1014 efforts[date] = None
1015 if not (old_todo and old_todo.deps):
1016 efforts[date] = postvars.get_at_index('effort', i, on_empty=None,
1017 float_if_possible=True)
1018 if postvars.has('delete'):
1019 has_day_effort = len([e for e in efforts.values() if e is not None]) > 0
1020 if postvars.has('done')\
1021 or postvars.get('comment')\
1022 or postvars.get_all('tag', [])\
1024 raise PlomException('will not remove todo of preserve-worthy values')
1025 db.delete_todo(todo_id)
1026 postvars.set('return_to', 'calendar')
1027 elif postvars.has('update'):
1028 if postvars.has('delete_effort'):
1029 for date in postvars.get_all('delete_effort'):
1030 db.delete_effort(old_todo, date)
1032 deps = [db.todos[id_] for id_ in postvars.get_all('adopt_dep', [])
1033 if id_ in db.todos.keys()]
1035 if not todo_id in [t.id_ for t in dep.dependers]:
1036 dep.dependers += [db.todos[todo_id]]
1037 birth_dep_ids = postvars.get_all('birth_dep', [])
1038 for id_ in [id_ for id_ in birth_dep_ids if not id_ in db.tasks.keys()]:
1039 raise PlomException('submitted illegal dep ID')
1040 tasks_to_birth = [db.tasks[id_] for id_ in birth_dep_ids]
1041 for task in tasks_to_birth:
1042 deps += [db.add_todo(task=task, efforts={latest_date: None}, parenthood=todo_parenthood)]
1043 db.update_todo(id_=todo_id,
1045 done=postvars.has('done'),
1046 comment=postvars.get('comment', ''),
1047 tags=postvars.get_all('tag', []),
1048 importance=float(postvars.get('importance')),
1051 elif 'task' == site:
1052 task_id = postvars.get('task_id')
1053 if (postvars.has('delete') or postvars.has('fork')) and (not task_id in db.tasks.keys()):
1054 if not task_id in db.tasks.keys():
1055 raise PlomException('can only do this on Task that already exists')
1056 if postvars.has('delete'):
1057 if [t for t in db.todos.values() if task_id == t.task.id_]:
1058 raise PlomException('will not remove Task describing existing Todos')
1059 if postvars.get('title', '')\
1060 or postvars.get_all('tag', [])\
1061 or postvars.get_all('dep', [])\
1062 or postvars.get('comment', ''):
1063 raise PlomException('will not remove Task of preserve-worthy values')
1064 db.delete_task(task_id)
1065 elif postvars.has('update'):
1066 dep_ids = postvars.get_all('dep', [])
1067 for id_ in [id_ for id_ in dep_ids if not id_ in db.tasks.keys()]:
1068 raise PlomException('submitted illegal dep ID')
1069 depender_ids = postvars.get_all('depender', [])
1070 for id_ in [id_ for id_ in depender_ids if not id_ in db.tasks.keys()]:
1071 raise PlomException('submitted illegal dep ID')
1072 task = db.update_task(
1074 title=postvars.get('title', ''),
1075 default_effort=postvars.get('default_effort', float_if_possible=True),
1076 tags=postvars.get_all('tag', []),
1077 comment=postvars.get('comment', ''),
1079 depender_ids=depender_ids)
1080 if postvars.has('add_as_todo'):
1081 db.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
1082 elif postvars.has('fork'):
1083 t = db.fork_task(task_id)
1086 # dep_ids = postvars.get_all('dep', [])
1087 # for id_ in [id_ for id_ in dep_ids if not id_ in db.tasks.keys()]:
1088 # raise PlomException('submitted illegal dep ID')
1089 # depender_ids = postvars.get_all('depender', [])
1090 # for id_ in [id_ for id_ in depender_ids if not id_ in db.tasks.keys()]:
1091 # raise PlomException('submitted illegal dep ID')
1092 # task = db.update_task(
1094 # title=postvars.get('title', ''),
1095 # default_effort=postvars.get('default_effort', float_if_possible=True),
1096 # tags=postvars.get_all('tag', []),
1097 # comment=postvars.get('comment', ''),
1099 # depender_ids=depender_ids)
1100 # if postvars.has('add_as_todo'):
1101 # db.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
1103 redir_params += [('id', task_id)]
1105 elif 'day_todos' == site:
1106 if postvars.has('update'):
1107 db.selected_date = postvars.get('date')
1108 redir_params += [('date', db.selected_date)]
1109 db.selected_day.comment = postvars.get('day_comment', '')
1110 task_id = postvars.get('choose_task', None)
1112 if task_id not in db.tasks.keys():
1113 raise PlomException('illegal task ID entered')
1114 db.add_todo(task=db.tasks[task_id], efforts={db.selected_date: None},
1115 parenthood=todo_parenthood)
1116 for id_ in postvars.get_all('choose_todo', []):
1117 db.todos[id_].efforts[db.selected_date] = None
1118 for i, todo_id in enumerate(postvars.get_all('todo_id', [])):
1119 old_todo = db.todos[todo_id]
1120 done = todo_id in postvars.get_all('done', [])
1121 day_effort_input = postvars.get_at_index('effort', i, '')
1122 day_effort = float(day_effort_input) if len(day_effort_input) > 0 else None
1123 comment = postvars.get_at_index('effort_comment', i, '')
1124 if (day_effort is not None) and (not done) and day_effort < 0 and 0 == len(comment):
1125 if len(old_todo.efforts) > 1:
1126 db.delete_effort(old_todo, db.selected_date)
1128 db.delete_todo(todo_id)
1130 importance = float(postvars.get_at_index('importance', i))
1132 and old_todo.done == done\
1133 and old_todo.day_effort == day_effort\
1134 and comment == old_todo.comment\
1135 and old_todo.importance == importance:
1137 db.update_todo_for_day(
1145 homepage = postvars.get('return_to')
1147 encoded_params = urlencode(redir_params)
1148 homepage = f'{site}?{encoded_params}'
1150 self.redirect(homepage)
1153 self.try_do(self.show_db)
1156 config = self.apps['todo'] if hasattr(self, 'apps') else self.config_init()
1157 parsed_url = urlparse(self.path)
1158 site = path_split(parsed_url.path)[1]
1159 cookie_db = self.get_cookie_db(config['cookie_name'])
1160 params = ParamsParser(parsed_url.query, cookie_db)
1162 selected_date = tag_filter_and = tag_filter_not = None
1163 if site in {'day_todos', 'calendar', 'task'}:
1164 selected_date = params.get_cookied('date')
1165 if site in {'day_todos', 'tasks'}:
1166 tag_filter_and = params.get_cookied_chain('and_tag', prefix=site)
1167 tag_filter_not = params.get_cookied_chain('not_tag', prefix=site)
1168 if site in {'day_todos', 'todo'}:
1169 todo_parenthood = params.get_cookied('parenthood', prefix=site)
1170 if site in {'calendar', 'calendar_export', ''}:
1171 start_date = params.get_cookied('start', prefix=site)
1172 end_date = params.get_cookied('end', prefix=site)
1173 db = TodoDB(selected_date, tag_filter_and, tag_filter_not)
1174 if site in {'todo', 'task'}:
1175 id_ = params.get('id')
1176 if 'reset_cookie' == site:
1177 cookie_db = { # sensible defaults
1178 params.cookie_key_from_params_key('day_todos', 'tree'): True,
1179 params.cookie_key_from_params_key('todo', 'and_tag'): ['default'],
1180 params.cookie_key_from_params_key('todo', 'not_tag'): ['ignore'],
1181 params.cookie_key_from_params_key('todo', 'start'): 'yesterday',
1182 params.cookie_key_from_params_key('todo', 'end'): 'today'}
1183 page = db.show_message('cookie unset!')
1184 elif 'day_todos' == site:
1185 is_tree_shaped = params.get_cookied('tree', prefix=site, as_bool=True)
1186 undone_sort_order = params.get_cookied('undone_sort', prefix=site)
1187 done_sort_order = params.get_cookied('done_sort', prefix=site)
1188 page = db.show_day_todos(undone_sort_order, done_sort_order, is_tree_shaped, todo_parenthood)
1189 elif site == 'todo':
1190 page = db.show_todo(id_, parenthood=todo_parenthood)
1191 elif 'task' == site:
1192 page = db.show_task(id_)
1193 elif 'tasks' == site:
1194 sort_order = params.get_cookied('sort', prefix=site)
1195 search = params.get('search', '')
1196 page = db.show_tasks(search, sort_order)
1197 elif 'add_task' == site:
1198 page = db.show_task(None)
1199 elif 'calendar_export' == site:
1200 page = db.show_calendar_export(start_date, end_date)
1201 else: # 'calendar' == site
1202 page = db.show_calendar(start_date, end_date)
1204 self.set_cookie(config['cookie_name'], config['cookie_path'], cookie_db)
1205 self.send_HTML(page)
1208 if __name__ == "__main__":
1209 run_server(server_port, TodoHandler)