1 from plomlib import PlomDB, run_server, PlomHandler, PlomException
4 from datetime import datetime, timedelta
5 from urllib.parse import parse_qs
6 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
7 from urllib.parse import urlparse
8 from os.path import split as path_split
9 db_path = '/home/plom/org/todo_new.json'
11 DATE_FORMAT = '%Y-%m-%d'
12 j2env = JinjaEnv(loader=JinjaFSLoader('todo_templates'))
16 def today_date(with_time=False):
17 length = 19 if with_time else 10
18 return str(datetime.now())[:length]
22 class AttributeWithHistory:
24 def __init__(self, parent, name, default_if_empty, history=None, then_date='2000-01-01'):
27 self.default = default_if_empty
28 self.then_date = then_date
29 self.history = history if history else {}
32 keys = sorted(self.history.keys())
33 if len(self.history) == 0 or value != self.history[keys[-1]]:
34 self.history[today_date(with_time=True)] = value
36 def at(self, queried_date):
37 end_of_queried_day = f'{queried_date} 23:59:59'
38 sorted_dates = sorted(self.history.keys())
39 if self.parent.forked_task and (0 == len(sorted_dates) or sorted_dates[0] > end_of_queried_day):
40 return getattr(self.parent.forked_task, self.name).at(queried_date)
41 elif 0 == len(sorted_dates):
43 ret = self.history[sorted_dates[0]]
44 for date_key, item in self.history.items():
45 if date_key > end_of_queried_day:
52 keys = sorted(self.history.keys())
53 if 0 == len(self.history):
54 if self.parent.forked_task:
55 return getattr(self.parent.forked_task, self.name).now
57 return self.history[keys[-1]]
61 return self.at(self.then_date)
67 def __init__(self, db, id_, comment):
70 self.comment = comment
74 visible_by_and = 0 == len([tag for tag in self.db.tag_filter_and
75 if not tag in self.tags])
76 visible_by_not = 0 == len([tag for tag in self.db.tag_filter_not
78 return visible_by_and and visible_by_not
82 if len(self.deps) == 0:
84 return 1 + max([d.deps_depth for d in self.deps])
89 def add_deps(node, all_deps):
92 add_deps(dep, all_deps)
93 add_deps(self, all_deps)
94 chain = list(all_deps)
95 chain.sort(key=lambda t: t.deps_depth)
99 loop_msg = "can't set dep, would create loop"
101 for dep in node.deps:
103 raise PlomException(loop_msg)
110 class Task(TaskLike):
117 default_effort_history=None,
118 dep_ids_history=None,
119 adoptivity_history=None,
122 super().__init__(db, id_, comment)
123 self.forks_id = forks_id
124 self.title = AttributeWithHistory(self, 'title', '', title_history, self.db.selected_date)
125 self.tags = AttributeWithHistory(self, 'tags', set(), tags_history, self.db.selected_date)
126 self.default_effort = AttributeWithHistory(self, 'default_effort', 0.0, default_effort_history, self.db.selected_date)
127 self.dep_ids = AttributeWithHistory(self, 'dep_ids', set(), dep_ids_history, self.db.selected_date)
128 self.fences_adoptions = AttributeWithHistory(self, 'fences_adoptions', False, adoptivity_history, self.db.selected_date)
131 def from_dict(cls, db, d, id_):
136 {k: set(v) for k, v in d['tags_history'].items()},
137 d['default_effort_history'],
138 {k: set(v) for k, v in d['deps_history'].items()},
139 d['adoptivity_history'],
140 comment=d['comment'],
146 'title_history': self.title.history,
147 'default_effort_history': self.default_effort.history,
148 'tags_history': {k: list(v) for k, v in self.tags.history.items()},
149 'deps_history': {k: list(v) for k, v in self.dep_ids.history.items()},
150 'adoptivity_history': self.fences_adoptions.history,
151 'comment': self.comment,
152 'forks': self.forks_id,
156 def forked_task(self):
157 return self.db.tasks[self.forks_id] if self.forks_id else None
162 for id_ in self.dep_ids.now:
165 elif id_ not in self.db.tasks.keys():
166 raise PlomException(f'dep referenced on Task {self.id_} not found: {id_}')
167 deps += [self.db.tasks[id_]]
171 def deps_weight(self):
172 def count_weight(task):
174 for dep in task.deps:
175 sub_count += count_weight(dep)
177 return count_weight(self)
181 return [t for t in self.db.tasks.values() if self in t.deps]
183 def matches(self, search):
187 return search in self.title.now or search in self.comment or search in '$'.join(self.tags.now)
190 def latest_effort_date(self):
191 todos = [t for t in self.db.todos.values() if t.task.id_ == self.id_]
192 todos.sort(key=lambda t: t.latest_date)
194 return todos[-1].latest_date
202 def __init__(self, db, date, comment=''):
205 self.comment = comment
206 self.linked_todos_as_list = []
209 def from_dict(cls, db, d, date=None):
210 comment = d['comment'] if 'comment' in d.keys() else ''
211 day = cls(db, date, comment)
215 d = {'comment': self.comment}
219 def linked_todos(self):
221 for todo in self.linked_todos_as_list:
222 linked_todos[todo.id_] = todo
225 def _todos_sum(self, include_undone=False):
227 for todo in [todo for todo in self.linked_todos.values()
228 if self.date in todo.efforts.keys()]:
229 day_effort = todo.efforts[self.date]
231 s += day_effort if day_effort else todo.task.default_effort.at(self.date)
233 s += day_effort if day_effort else 0
238 return self._todos_sum()
240 def sorted_todos(self, done, is_tree_shaped, sort_order, legal_keys):
241 todos = [t for t in self.linked_todos_as_list if t.visible and t.done == done]
244 sort_column = sort_order[:]
245 if sort_order and '-' == sort_order[0]:
247 sort_column = sort_order[1:]
248 if sort_column in legal_keys:
249 todos.sort(key=lambda t: getattr(t, sort_column))
253 def walk_tree(todo, sorted_todos):
254 todo.deps = [t for t in sorted_todos if t in todo.deps]
255 for dep in [t for t in sorted_todos if t in todo.deps]:
256 walk_tree(dep, sorted_todos)
258 trunk = [t for t in todos if len([td for td in t.dependers if td.done]) == 0]
260 trunk = [t for t in todos if len(t.dependers) == 0]
262 walk_tree(node, todos)
267 def visible_in_export(self):
268 return len(self.comment) + len([t for t in self.linked_todos_as_list if t.visible]) > 0
272 class Todo(TaskLike):
284 super().__init__(db, id_, comment)
287 for k in efforts.keys():
289 datetime.strptime(k, DATE_FORMAT)
291 raise PlomException(f'Todo creation: bad date string: {k}')
292 self.efforts = efforts if efforts else {}
293 self.day_tags = day_tags if day_tags else set()
294 self.importance = importance
295 self.dep_ids = dep_ids if dep_ids else set()
296 self.already_listed = False
297 self.been_observed = False
300 def from_dict(cls, db, d, id_):
315 'task': self.task.id_,
317 'comment': self.comment,
318 'day_tags': list(self.day_tags),
319 'importance': self.importance,
320 'efforts': self.efforts,
321 'deps': list(self.dep_ids)}
325 return self.task.title.at(self.db.selected_date)
328 def dated_title(self):
329 return f'{self.earliest_date}:{self.title}'
333 return [self.db.todos[id_] for id_ in self.dep_ids]
336 def deps(self, deps):
337 self.dep_ids = {dep.id_ for dep in deps}
340 def default_effort(self):
341 return self.task.default_effort.at(self.earliest_date)
345 return self.deps_done and self._done
348 def done(self, doneness):
349 self._done = doneness
353 if len(self.deps) > 0:
354 for dep in self.deps:
360 def dep_efforts(self):
363 for dep in self.deps:
364 dep_efforts += dep.all_days_effort
365 dep_efforts += dep.dep_efforts
368 def depender_path(self, dep):
370 return f'Y{depender.depender_path()}:{self.title}Y'
372 return f'X{self.title}X'
375 def depender_paths(self):
377 for depender in self.dependers:
378 if len(depender.depender_paths) == 0:
379 paths += [[depender]]
381 for path in depender.depender_paths:
382 paths += [path + [depender]]
386 def shortened_depender_paths(self):
388 for path in self.depender_paths:
390 for i, step in enumerate(path):
391 next_step = None if len(path) <= i+1 else path[i+1]
392 for distinct_path in new_paths:
393 if len(distinct_path) >= i+2 and distinct_path[i+1] == next_step:
396 new_paths += [new_path]
400 def all_days_effort(self):
402 for effort in self.efforts.values():
403 total += effort if effort else 0
405 total = max(total, self.task.default_effort.at(self.latest_date))
408 def matches(self, search):
412 return search in self.comment or search in '$'.join(self.tags) or search in self.title
414 def is_effort_removable(self, date):
415 if not date in self.efforts.keys():
417 if self.efforts[date]:
419 if self.done and date == self.latest_date:
425 return self.day_tags | self.task.tags.now
428 def day_effort(self):
429 return self.efforts[self.db.selected_date]
432 def effort_at_selected_date(self):
433 if self.db.selected_date in self.efforts.keys() and self.day_effort is not None:
434 return self.day_effort
436 return self.task.default_effort.then
440 return self.db.days[self.earliest_date]
443 def sorted_effort_dates(self):
444 dates = list(self.efforts.keys())
449 def earliest_date(self):
450 return self.sorted_effort_dates[0]
453 def latest_date(self):
454 return self.sorted_effort_dates[-1]
457 def has_dependers(self):
458 return len(self.dependers) > 0
462 return [t for t in self.db.todos.values() if self.id_ in t.dep_ids]
466 return len(self.deps) > 0
470 return self.day_effort if self.day_effort else 0
473 def family_effort(self):
474 return self.effort_at_selected_date + self.dep_efforts
476 def closest_adoptable(self, task_id):
477 print("DEBUG closest_adoptable", self.id_)
478 ranked_adoptables = {self.id_: 0}
479 def collect_downwards(node, ranked_adoptables, n_steps):
480 print("debug called collect_downwards", node.title, n_steps)
481 if not node.id_ in ranked_adoptables.keys():
482 ranked_adoptables[node.id_] = n_steps
483 for dep in node.deps:
484 collect_downwards(dep, ranked_adoptables, n_steps+1)
485 def collect_upwards(node, adoptables, n_steps):
486 print("debug called collect_upwards", node.title, n_steps)
487 collect_downwards(node, ranked_adoptables, n_steps+1)
488 if not node.task.fences_adoptions.now:
489 print("debug not blocking", node.dependers)
490 for depender in node.dependers:
491 collect_upwards(depender, ranked_adoptables, n_steps+1)
492 if len(node.dependers) == 0:
493 print("debug NO DEPENDERS, calling day")
494 for t in self.db.days[self.latest_date].linked_todos_as_list:
495 print("debug linked_todos_as_list:", t.title, t.id_)
496 collect_downwards(t, ranked_adoptables, n_steps)
498 print("debug BLOCKING")
499 collect_upwards(self, ranked_adoptables, 0)
500 # for depender in self.dependers:
501 # collect_upwards(depender, ranked_adoptables, 0)
502 del ranked_adoptables[self.id_]
503 for id_, distance in sorted(ranked_adoptables.items(), key=lambda i:i[1], reverse=True):
504 if self.db.todos[id_].task.id_ == task_id:
505 return self.db.todos[id_]
510 self.been_observed = True
515 class TodoDB(PlomDB):
519 tag_filter_and = None,
520 tag_filter_not = None):
521 self.selected_date = selected_date if selected_date else today_date()
522 self.tag_filter_and = tag_filter_and if tag_filter_and else []
523 self.tag_filter_not = tag_filter_not if tag_filter_not else []
529 super().__init__(db_path)
533 def read_db_file(self, f):
536 for id_, t_dict in d['tasks'].items():
537 t = self.add_task(id_=id_, dict_source=t_dict)
538 for tag in t.tags.now:
540 # for task in self.tasks.values():
541 # for dep in task.deps:
542 # dep.dependers += [task]
544 for id_, todo_dict in d['todos'].items():
545 todo = self.add_todo(todo_dict, id_)
546 for tag in todo.day_tags:
548 # for todo in self.todos.values():
549 # for dep in todo.deps:
550 # dep.dependers += [todo]
552 for date, day_dict in d['days'].items():
553 self.add_day(dict_source=day_dict, date=date)
554 for todo in self.todos.values():
555 for date in todo.efforts.keys():
556 if not date in self.days.keys():
558 self.days[date].linked_todos_as_list += [todo]
560 # self.set_visibilities()
563 d = {'tasks': {}, 'days': {}, 'todos': {}}
564 for uuid, t in self.tasks.items():
565 d['tasks'][uuid] = t.to_dict()
566 for date, day in self.days.items():
567 d['days'][date] = day.to_dict()
568 for id_, todo in self.todos.items():
569 d['todos'][id_] = todo.to_dict()
574 for date, day in self.days.items():
575 if len(day.linked_todos) == 0 and len(day.comment) == 0:
576 dates_to_purge += [date]
577 for date in dates_to_purge:
579 self.write_text_to_db(json.dumps(self.to_dict()))
584 def selected_day(self):
585 if not self.selected_date in self.days.keys():
586 self.days[self.selected_date] = self.add_day(date=self.selected_date)
587 return self.days[self.selected_date]
589 # table manipulations
591 def add_day(self, date, dict_source=None):
592 day = Day.from_dict(self, dict_source, date) if dict_source else Day(self, date)
593 self.days[date] = day
596 def add_task(self, id_=None, dict_source=None):
597 id_ = id_ if id_ else str(uuid4())
598 t = Task.from_dict(self, dict_source, id_) if dict_source else Task(self, id_)
602 def fork_task(self, id_):
603 origin = self.tasks[id_]
604 now = today_date(with_time=True)
605 fork_id = str(uuid4())
606 fork = Task(self, fork_id, {}, {}, {}, {}, origin.comment, id_)
607 self.tasks[fork_id] = fork
610 def update_task(self, id_, title, default_effort, tags, fences_adoptions, comment, dep_ids, depender_ids):
611 task = self.tasks[id_] if id_ in self.tasks.keys() else self.add_task(id_)
612 task.title.set(title)
613 task.default_effort.set(default_effort)
615 task.fences_adoptions.set(fences_adoptions)
616 task.comment = comment
617 task.dep_ids.set(dep_ids)
618 for depender in task.dependers:
619 if not depender.id_ in depender_ids:
620 depender_dep_ids = depender.dep_ids.now
621 depender_dep_ids.remove(task.id_)
622 depender.dep_ids.set(depender_dep_ids)
623 for depender_id in depender_ids:
624 depender = self.tasks[depender_id]
625 depender_dep_ids = depender.dep_ids.now
626 depender_dep_ids.add(task.id_)
627 depender.dep_ids.set(depender_dep_ids)
631 def add_todo(self, todo_dict=None, id_=None, task=None, efforts=None, parenthood='', force_new=False, creator=None):
632 make_children = parenthood != 'childless'
633 adopt_if_possible = parenthood == 'adoption'
634 id_ = id_ if id_ else str(uuid4())
636 todo = Todo.from_dict(self, todo_dict, id_)
637 self.todos[id_] = todo
638 elif task and efforts:
639 todo = Todo(self, id_, task, efforts=efforts)
640 print("DEBUG calling add_todo for", task.title.now, id_)
641 print("debug creator:", creator)
642 self.todos[id_] = todo
644 creator.deps += [todo]
646 if make_children and todo.latest_date in self.days.keys():
647 for dep_task in task.deps:
648 # if Todo expects dependencies, adopt any compatibles found in DB.selected_date
649 # before creating new depended Todos
652 if adopt_if_possible:
653 print("debug trying adoption for", dep_task.title.now)
654 # adoptable_todos = [t for t in self.days[todo.latest_date].linked_todos_as_list
655 # if t.task.id_ == dep_task.id_]
656 # if len(adoptable_todos) > 0:
657 # dep_todo = adoptable_todos[0]
658 dep_todo = todo.closest_adoptable(dep_task.id_)
659 print("debug GOT ADOPTION", dep_todo)
661 # dep_todo = self.add_todo(task=dep_task, efforts=efforts, force_new)
662 print("debug no adoption for", dep_task.title.now, id_)
663 dep_todo = self.add_todo(task=dep_task, efforts=efforts, parenthood=parenthood, creator=todo)
664 todo.deps += [dep_todo]
667 for date in todo.efforts.keys():
668 if date in self.days.keys(): # possibly not all Days have been initialized yet
669 self.days[date].linked_todos_as_list += [todo]
672 def _update_todo_shared(self, id_, done, comment, importance):
673 todo = self.todos[id_]
675 todo.comment = comment
676 todo.importance = importance
679 def update_todo_for_day(self, id_, date, effort, done, comment, importance):
680 todo = self._update_todo_shared(id_, done, comment, importance)
681 todo.efforts[date] = effort
683 def update_todo(self, id_, efforts, done, comment, tags, importance, deps, depender_ids):
684 todo = self._update_todo_shared(id_, done, comment, importance)
685 if len(efforts) == 0 and not todo.deps:
686 raise PlomException('todo must have at least one effort!')
687 todo.efforts = efforts
688 for date in todo.efforts.keys():
689 if not date in self.days.keys():
690 self.add_day(date=date)
691 if not self in self.days[date].linked_todos_as_list:
692 self.days[date].linked_todos_as_list += [todo]
695 for depender_id in depender_ids:
696 depender = self.todos[depender_id]
697 depender.dep_ids.add(todo.id_)
698 for depender in todo.dependers:
699 if not depender.id_ in depender_ids:
700 depender.dep_ids.remove(todo.id_)
703 def delete_todo(self, id_):
704 todo = self.todos[id_]
706 for date in todo.efforts.keys():
707 dates_to_delete += [date]
708 for date in dates_to_delete:
709 self.delete_effort(todo, date, force=True)
710 for depender in todo.dependers:
711 depender.dep_ids.remove(todo.id_)
714 def delete_task(self, id_):
717 def delete_effort(self, todo, date, force=False):
718 if (not force) and len(todo.efforts) == 1:
719 raise PlomException('todo must retain at least one effort!')
720 self.days[date].linked_todos_as_list.remove(todo)
721 del todo.efforts[date]
725 def get_message(self, message):
726 return j2env.get_template('message.html').render(message=message)
728 def get_calendar_export(self, start_date_str, end_date_str):
729 days_to_show = self.init_calendar_items(start_date_str, end_date_str)
730 return j2env.get_template('calendar_export.html').render(days=days_to_show)
732 def get_calendar(self, start_date_str, end_date_str):
733 days_to_show = self.init_calendar_items(start_date_str, end_date_str)
734 return j2env.get_template('calendar.html').render(
735 selected_date=self.selected_date,
737 start_date=start_date_str,
738 end_date=end_date_str)
740 def get_day_todos(self, undone_sort_order=None, done_sort_order=None, is_tree_shaped=False, todo_parenthood=None):
741 legal_undone_sort_keys = {'title', 'sort_done', 'default_effort', 'importance'}
742 legal_done_sort_keys = {'title', 'effort_at_selected_date', 'family_effort'}
744 current_date = datetime.strptime(self.selected_date, DATE_FORMAT)
745 prev_date = current_date - timedelta(days=1)
746 prev_date_str = prev_date.strftime(DATE_FORMAT)
747 next_date = current_date + timedelta(days=1)
748 next_date_str = next_date.strftime(DATE_FORMAT)
750 adoptable_past_todos = []
751 for todo in [t for t in self.todos.values()
754 and t.earliest_date < self.selected_date]:
755 adoptable_past_todos += [todo]
756 undone_todos = self.selected_day.sorted_todos(False, is_tree_shaped, undone_sort_order,
757 legal_undone_sort_keys)
758 done_todos = self.selected_day.sorted_todos(True, is_tree_shaped, done_sort_order,
759 legal_done_sort_keys)
761 return j2env.get_template('day_todos.html').render(
762 day=self.selected_day,
764 filter_and=self.tag_filter_and,
765 filter_not=self.tag_filter_not,
766 prev_date=prev_date_str,
767 adoptable_past_todos=adoptable_past_todos,
768 next_date=next_date_str,
769 all_tasks=[t for t in self.tasks.values()],
770 undone_todos=undone_todos,
771 done_todos=done_todos,
772 is_tree_shaped=is_tree_shaped,
773 undone_sort=undone_sort_order,
774 done_sort=done_sort_order,
775 parenthood=todo_parenthood)
777 def get_todo(self, id_, parenthood):
778 todo = self.todos[id_]
779 filtered_tasks = [t for t in self.tasks.values()
781 filtered_todos = [t for t in self.todos.values()
783 and t not in todo.deps]
785 legal_dates = list(todo.efforts.keys())
786 date_filtered_todos = []
787 for date in legal_dates:
788 for filtered_todo in filtered_todos:
789 if filtered_todo in date_filtered_todos:
791 if date in filtered_todo.efforts.keys():
792 date_filtered_todos += [filtered_todo]
795 for dep in todo.task.deps:
796 dep_slots += [{'task': dep,
797 'todos': [t for t in todo.deps if t.task == dep]}]
800 for dep in todo.task.deps:
801 suggested_todos[dep.id_] = [t for t in date_filtered_todos if t.task.id_ == dep.id_]
802 additional_deps = [t for t in todo.deps if not t.task in todo.task.deps]
804 return j2env.get_template('todo.html').render(
807 filtered_todos=date_filtered_todos,
808 filtered_tasks=filtered_tasks,
810 suggested_todos=suggested_todos,
811 additional_deps=additional_deps,
812 parentood=parenthood,
815 def get_task(self, id_):
817 if not id_ in self.tasks.keys():
818 raise PlomException('no Task for ID')
819 task = self.tasks[id_]
821 task = self.add_task()
823 task.default_effort.set(1.0)
824 filtered_tasks = [t for t in self.tasks.values()
826 and (t not in task.deps)]
828 return j2env.get_template('task.html').render(
829 selected_date=self.selected_date,
831 filtered_tasks=filtered_tasks,
834 def get_tasks(self, search, sort_order=None):
836 for task in [t for t in self.tasks.values() if (not search) or t.matches(search)]:
837 filtered_tasks += [task]
839 sort_column = sort_order
840 if sort_order and '-' == sort_order[0]:
842 sort_column = sort_order[1:]
843 if sort_column == 'title':
844 filtered_tasks.sort(key=lambda t: t.title.now)
845 elif sort_column == 'default_effort':
846 filtered_tasks.sort(key=lambda t: t.default_effort.now)
847 elif sort_column == 'weight':
848 filtered_tasks.sort(key=lambda t: t.deps_weight)
849 elif sort_column == 'latest_effort_date':
850 filtered_tasks.sort(key=lambda t: t.latest_effort_date)
852 filtered_tasks.reverse()
853 return j2env.get_template('tasks.html').render(
855 tasks=filtered_tasks,
857 filter_and=self.tag_filter_and,
858 filter_not=self.tag_filter_not,
863 def post_todo(self, id_, postvars, todo_parenthood):
864 if postvars.has('delete') and (not id_ in self.todos.keys()):
865 raise PlomException('can only do this on Todo that already exists')
866 old_todo = self.todos[id_] if id_ in self.todos.keys() else None
867 latest_date = self.selected_date
869 for i, date in enumerate(postvars.get_all('effort_date', [])):
873 efforts[date] = postvars.get_at_index('effort', i, float_if_possible=True)
874 if postvars.has('delete'):
875 raise PlomException('can only do this on Task that already exists')
876 has_day_effort = len([e for e in efforts.values() if e is not None]) > 0
877 if postvars.has('done')\
878 or postvars.get('comment')\
879 or postvars.get_all('tag', [])\
881 raise PlomException('will not remove todo of preserve-worthy values')
882 self.delete_todo(id_)
884 elif postvars.has('update'):
885 if postvars.has('delete_effort'):
886 for date in postvars.get_all('delete_effort'):
887 self.delete_effort(old_todo, date)
889 deps = [self.todos[adopt_id] for adopt_id in postvars.get_all('adopt_dep', [])
890 if adopt_id in self.todos.keys()]
891 birth_dep_ids = postvars.get_all('birth_dep', [])
892 for bad_id in [bad_id for bad_id in birth_dep_ids if not bad_id in self.tasks.keys()]:
893 raise PlomException('submitted illegal dep ID')
894 tasks_to_birth = [self.tasks[dep_id] for dep_id in birth_dep_ids]
895 for task in tasks_to_birth:
896 deps += [self.add_todo(task=task,
897 efforts={latest_date: None},
898 parenthood=todo_parenthood)]
899 depender_ids = postvars.get_all('depender', [])
900 self.update_todo(id_=id_,
902 done=postvars.has('done'),
903 comment=postvars.get('comment', ''),
904 tags=postvars.get_all('tag', []),
905 importance=postvars.get('importance', float_if_possible=True),
907 depender_ids=depender_ids)
910 def post_task(self, id_, postvars):
911 if (postvars.has('delete') or postvars.has('fork')) and (not id_ in self.tasks.keys()):
912 raise PlomException('can only do this on Task that already exists')
913 if postvars.has('delete'):
914 if [t for t in self.todos.values() if id_ == t.task.id_]:
915 raise PlomException('will not remove Task describing existing Todos')
916 if postvars.get('title', '')\
917 or postvars.get_all('tag', [])\
918 or postvars.get_all('dep', [])\
919 or postvars.get('comment', ''):
920 raise PlomException('will not remove Task of preserve-worthy values')
921 self.delete_task(id_)
923 elif postvars.has('update'):
924 dep_ids = postvars.get_all('dep', [])
925 for bad_id in [bad_id for bad_id in dep_ids if not bad_id in self.tasks.keys()]:
926 raise PlomException('submitted illegal dep ID')
927 depender_ids = postvars.get_all('depender', [])
928 for bad_id in [bad_id_ for bad_id in depender_ids if not bad_id in self.tasks.keys()]:
929 raise PlomException('submitted illegal dep ID')
930 task = self.update_task(
932 title=postvars.get('title', ''),
933 default_effort=postvars.get('default_effort', float_if_possible=True),
934 tags=postvars.get_all('tag', []),
935 comment=postvars.get('comment', ''),
936 fences_adoptions=postvars.get('fences_adoptions', False),
938 depender_ids=depender_ids)
939 if postvars.has('add_as_todo'):
940 self.add_todo(task=task, efforts={postvars.get('new_todo_date'): None})
941 elif postvars.has('fork'):
942 t = self.fork_task(id_)
946 def post_day_todos(self, postvars, todo_parenthood):
947 if not postvars.has('update'):
949 self.selected_date = postvars.get('date')
950 self.selected_day.comment = postvars.get('day_comment', '')
951 task_id = postvars.get('choose_task', None)
953 if task_id not in self.tasks.keys():
954 raise PlomException('illegal task ID entered')
955 self.add_todo(task=self.tasks[task_id], efforts={self.selected_date: None},
956 parenthood=todo_parenthood)
957 for todo_id in postvars.get_all('choose_todo', []):
958 self.todos[todo_id].efforts[self.selected_date] = None
959 for i, todo_id in enumerate(postvars.get_all('todo_id', [])):
960 old_todo = self.todos[todo_id]
961 done = todo_id in postvars.get_all('done', [])
962 day_effort_input = postvars.get_at_index('effort', i, '')
963 day_effort = float(day_effort_input) if len(day_effort_input) > 0 else None
964 comment = postvars.get_at_index('effort_comment', i, '')
965 if (day_effort is not None) and (not done) and day_effort < 0 and 0 == len(comment):
966 if len(old_todo.efforts) > 1:
967 self.delete_effort(old_todo, self.selected_date)
969 self.delete_todo(todo_id)
971 importance = float(postvars.get_at_index('importance', i))
973 and old_todo.done == done\
974 and old_todo.day_effort == day_effort\
975 and comment == old_todo.comment\
976 and old_todo.importance == importance:
978 self.update_todo_for_day(
988 def init_calendar_items(self, start_date_str, end_date_str):
989 self.tag_filter_and = ['calendar']
990 self.tag_filter_not = ['deleted']
992 todays_date_obj = datetime.strptime(today_date(), DATE_FORMAT)
993 yesterdays_date_obj = todays_date_obj - timedelta(1)
994 def get_day_limit_obj(index, day_limit_string):
995 date_obj = datetime.strptime(sorted(self.days.keys())[index], DATE_FORMAT)
996 if day_limit_string and len(day_limit_string) > 0:
997 if day_limit_string in {'today', 'yesterday'}:
998 date_obj = todays_date_obj if day_limit_string == 'today' else yesterdays_date_obj
1000 date_obj = datetime.strptime(day_limit_string, DATE_FORMAT)
1002 start_date_obj = get_day_limit_obj(0, start_date_str)
1003 end_date_obj = get_day_limit_obj(-1, end_date_str)
1006 for n in range(int((end_date_obj - start_date_obj).days) + 1):
1007 date_obj = start_date_obj + timedelta(n)
1008 date_str = date_obj.strftime(DATE_FORMAT)
1009 if date_str not in self.days.keys():
1010 days_to_show[date_str] = self.add_day(date_str)
1012 days_to_show[date_str] = self.days[date_str]
1013 days_to_show[date_str].month_title = date_obj.strftime('%B') if date_obj.day == 1 else None
1014 days_to_show[date_str].weekday = datetime.strptime(date_str, DATE_FORMAT).strftime('%A')[:2]
1021 def __init__(self, parsed_url_query, cookie_db):
1022 self.params = parse_qs(parsed_url_query)
1023 self.cookie_db = cookie_db
1025 def get(self, key, default=None, as_bool=False):
1026 param = self.params.get(key, [default])[0]
1030 elif param is not None:
1034 def cookie_key_from_params_key(self, prefix, key):
1035 return f'{prefix}:{key}' if prefix else key
1037 def _get_cookied(self, cookie_key, params):
1038 if params in [['-'], '-']:
1040 if cookie_key in self.cookie_db.keys():
1041 del self.cookie_db[cookie_key]
1042 if params is None and cookie_key in self.cookie_db.keys():
1043 params = self.cookie_db[cookie_key]
1044 if params is not None:
1045 self.cookie_db[cookie_key] = params
1048 def get_cookied(self, key, default=None, prefix=None, as_bool=False):
1049 cookie_key = self.cookie_key_from_params_key(prefix, key)
1050 param = self.get(key, default, as_bool)
1051 return self._get_cookied(cookie_key, param)
1053 def get_cookied_chain(self, key, default=None, prefix=None):
1054 cookie_key = self.cookie_key_from_params_key(prefix, key)
1055 params = self.params.get(key, default)
1056 return self._get_cookied(cookie_key, params)
1060 class PostvarsParser:
1062 def __init__(self, postvars):
1063 self.postvars = postvars
1066 return key in self.postvars.keys()
1068 def get(self, key, on_empty=None, float_if_possible=False):
1069 return self.get_at_index(key, 0, on_empty, float_if_possible)
1071 def get_at_index(self, key, i, on_empty=None, float_if_possible=False):
1072 if self.has(key) and len(self.postvars[key][i]) > 0:
1073 val = self.postvars[key][i]
1076 if float_if_possible and val is not None:
1081 def get_all(self, key, on_empty=None):
1082 if self.has(key) and len(self.postvars[key]) > 0:
1083 return [v for v in self.postvars[key] if len(v) > 0]
1086 def set(self, key, value):
1087 self.postvars[key] = [value]
1091 class TodoHandler(PlomHandler):
1093 def config_init(self):
1095 'cookie_name': 'todo_cookie',
1099 def app_init(self, handler):
1100 default_path = '/todo'
1101 handler.add_route('GET', default_path, self.show_db)
1102 handler.add_route('POST', default_path, self.write_db)
1103 return 'todo', {'cookie_name': 'todo_cookie', 'cookie_path': default_path}
1106 self.try_do(self.write_db)
1109 from urllib.parse import urlencode
1110 config = self.apps['todo'] if hasattr(self, 'apps') else self.config_init()
1111 parsed_url = urlparse(self.path)
1112 site = path_split(parsed_url.path)[1]
1113 length = int(self.headers['content-length'])
1114 postvars = PostvarsParser(parse_qs(self.rfile.read(length).decode(), keep_blank_values=1))
1118 # if we do encounter a filter post, we repost it (and if empty, the emptying command '-')
1119 for param_name, filter_db_name in {('and_tag', 'tag_filter_and'),
1120 ('not_tag', 'tag_filter_not')}:
1121 filter_db = getattr(db, filter_db_name)
1122 if postvars.has(param_name):
1123 for target in postvars.get_all(param_name, []):
1124 if len(target) > 0 and not target in filter_db:
1125 filter_db += [target]
1126 if len(filter_db) == 0:
1127 redir_params += [(param_name, '-')]
1128 redir_params += [(param_name, f) for f in filter_db]
1129 if site in {'calendar', 'todo'}:
1130 redir_params += [('end', postvars.get('end', '-'))]
1131 redir_params += [('start', postvars.get('start', '-'))]
1132 if site in {'day_todos', 'todo'}:
1133 todo_parenthood = postvars.get('parenthood')
1134 redir_params += [('parenthood', todo_parenthood)]
1135 if site in {'todo', 'task'}:
1136 id_ = postvars.get('id')
1139 redir_params += [('search', postvars.get('search', ''))]
1140 elif 'todo' == site:
1141 if db.post_todo(id_, postvars, todo_parenthood):
1142 redir_params += [('id', id_)]
1145 elif 'task' == site:
1146 ret = db.post_task(id_, postvars)
1148 redir_params += [('id', ret)]
1151 elif 'day_todos' == site:
1152 db.post_day_todos(postvars, todo_parenthood)
1153 redir_params += [('date', db.selected_date)]
1155 encoded_params = urlencode(redir_params)
1156 homepage = f'{site}?{encoded_params}'
1158 self.redirect(homepage)
1161 self.try_do(self.show_db)
1164 config = self.apps['todo'] if hasattr(self, 'apps') else self.config_init()
1165 parsed_url = urlparse(self.path)
1166 site = path_split(parsed_url.path)[1]
1168 cookie_db = self.get_cookie_db(config['cookie_name'])
1169 params = ParamsParser(parsed_url.query, cookie_db)
1170 selected_date = tag_filter_and = tag_filter_not = None
1171 if site in {'day_todos', 'calendar', 'task'}:
1172 selected_date = params.get_cookied('date')
1173 if site in {'day_todos', 'tasks'}:
1174 tag_filter_and = params.get_cookied_chain('and_tag', prefix=site)
1175 tag_filter_not = params.get_cookied_chain('not_tag', prefix=site)
1176 if site in {'day_todos', 'todo'}:
1177 todo_parenthood = params.get_cookied('parenthood', prefix=site)
1178 if site in {'calendar', 'calendar_export', ''}:
1179 start_date = params.get_cookied('start', prefix=site)
1180 end_date = params.get_cookied('end', prefix=site)
1181 if site in {'todo', 'task'}:
1182 id_ = params.get('id')
1184 db = TodoDB(selected_date, tag_filter_and, tag_filter_not)
1185 if 'reset_cookie' == site:
1186 cookie_db = { # sensible defaults
1187 params.cookie_key_from_params_key('day_todos', 'tree'): True,
1188 params.cookie_key_from_params_key('todo', 'and_tag'): ['default'],
1189 params.cookie_key_from_params_key('todo', 'not_tag'): ['ignore'],
1190 params.cookie_key_from_params_key('todo', 'start'): 'yesterday',
1191 params.cookie_key_from_params_key('todo', 'end'): 'today'}
1192 page = db.get_message('cookie unset!')
1193 elif 'day_todos' == site:
1194 tree_view = params.get_cookied('tree', prefix=site, as_bool=True)
1195 undone_sort_order = params.get_cookied('undone_sort', prefix=site)
1196 done_sort_order = params.get_cookied('done_sort', prefix=site)
1197 page = db.get_day_todos(undone_sort_order, done_sort_order, tree_view, todo_parenthood)
1198 elif site == 'todo':
1199 page = db.get_todo(id_, parenthood=todo_parenthood)
1200 elif 'task' == site:
1201 page = db.get_task(id_)
1202 elif 'tasks' == site:
1203 sort_order = params.get_cookied('sort', prefix=site)
1204 search = params.get('search', '')
1205 page = db.get_tasks(search, sort_order)
1206 elif 'add_task' == site:
1207 page = db.get_task(None)
1208 elif 'calendar_export' == site:
1209 page = db.get_calendar_export(start_date, end_date)
1210 else: # 'calendar' == site
1211 page = db.get_calendar(start_date, end_date)
1213 self.set_cookie(config['cookie_name'], config['cookie_path'], cookie_db)
1214 self.send_HTML(page)
1218 if __name__ == "__main__":
1219 run_server(server_port, TodoHandler)