home · contact · privacy
Extend Todo tests, overhaul Ctx library building.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from __future__ import annotations
3 from typing import Any, Callable
4 from base64 import b64encode, b64decode
5 from binascii import Error as binascii_Exception
6 from http.server import BaseHTTPRequestHandler
7 from http.server import HTTPServer
8 from urllib.parse import urlparse, parse_qs
9 from json import dumps as json_dumps
10 from os.path import split as path_split
11 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
12 from plomtask.dating import date_in_n_days
13 from plomtask.days import Day
14 from plomtask.exceptions import (HandledException, BadFormatException,
15                                  NotFoundException)
16 from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences
17 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
18 from plomtask.conditions import Condition
19 from plomtask.todos import Todo, TodoStepsNode
20
21 TEMPLATES_DIR = 'templates'
22
23
24 class TaskServer(HTTPServer):
25     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
26
27     def __init__(self, db_file: DatabaseFile,
28                  *args: Any, **kwargs: Any) -> None:
29         super().__init__(*args, **kwargs)
30         self.db = db_file
31         self.headers: list[tuple[str, str]] = []
32         self._render_mode = 'html'
33         self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
34
35     def set_json_mode(self) -> None:
36         """Make server send JSON instead of HTML responses."""
37         self._render_mode = 'json'
38         self.headers += [('Content-Type', 'application/json')]
39
40     @staticmethod
41     def ctx_to_json(ctx: dict[str, object], conn: DatabaseConnection) -> str:
42         """Render ctx into JSON string."""
43
44         def walk_ctx(node: object, references: CtxReferences) -> Any:
45             if hasattr(node, 'into_reference'):
46                 if hasattr(node, 'id_') and node.id_ is not None:
47                     library_growing[0] = True
48                     return node.into_reference(references)
49             if hasattr(node, 'as_dict'):
50                 d = node.as_dict
51                 if '_references' in d:
52                     own_refs = d['_references']
53                     if own_refs.update(references):
54                         library_growing[0] = True
55                     del d['_references']
56                 return d
57             if isinstance(node, (list, tuple)):
58                 return [walk_ctx(x, references) for x in node]
59             if isinstance(node, dict):
60                 d = {}
61                 for k, v in node.items():
62                     d[k] = walk_ctx(v, references)
63                 return d
64             if isinstance(node, HandledException):
65                 return str(node)
66             return node
67
68         models = {}
69         for cls in [Day, Process, ProcessStep, Condition, Todo]:
70             models[cls.__name__] = cls
71         library: dict[str, dict[str | int, object]] = {}
72         references = CtxReferences({})
73         library_growing = [True]
74         while library_growing[0]:
75             library_growing[0] = False
76             for k, v in ctx.items():
77                 ctx[k] = walk_ctx(v, references)
78             for cls_name, ids in references.d.items():
79                 if cls_name not in library:
80                     library[cls_name] = {}
81                 for id_ in ids:
82                     cls = models[cls_name]
83                     assert hasattr(cls, 'can_create_by_id')
84                     if cls.can_create_by_id:
85                         assert hasattr(cls, 'by_id_or_create')
86                         d = cls.by_id_or_create(conn, id_).as_dict
87                     else:
88                         assert hasattr(cls, 'by_id')
89                         d = cls.by_id(conn, id_).as_dict
90                     del d['_references']
91                     library[cls_name][id_] = d
92                 references.d[cls_name] = []
93         ctx['_library'] = library
94         return json_dumps(ctx)
95
96     def render(self,
97                ctx: dict[str, object],
98                tmpl_name: str,
99                conn: DatabaseConnection
100                ) -> str:
101         """Render ctx according to self._render_mode.."""
102         tmpl_name = f'{tmpl_name}.{self._render_mode}'
103         if 'html' == self._render_mode:
104             template = self._jinja.get_template(tmpl_name)
105             return template.render(ctx)
106         return self.__class__.ctx_to_json(ctx, conn)
107
108
109 class InputsParser:
110     """Wrapper for validating and retrieving dict-like HTTP inputs."""
111
112     def __init__(self, dict_: dict[str, list[str]],
113                  strictness: bool = True) -> None:
114         self.inputs = dict_
115         self.strict = strictness  # return None on absence of key, or fail?
116
117     def get_str(self, key: str, default: str = '',
118                 ignore_strict: bool = False) -> str:
119         """Retrieve single/first string value of key, or default."""
120         if key not in self.inputs.keys() or 0 == len(self.inputs[key]):
121             if self.strict and not ignore_strict:
122                 raise NotFoundException(f'no value found for key {key}')
123             return default
124         return self.inputs[key][0]
125
126     def get_first_strings_starting(self, prefix: str) -> dict[str, str]:
127         """Retrieve dict of (first) strings at key starting with prefix."""
128         ret = {}
129         for key in [k for k in self.inputs.keys() if k.startswith(prefix)]:
130             ret[key] = self.inputs[key][0]
131         return ret
132
133     def get_int(self, key: str) -> int:
134         """Retrieve single/first value of key as int, error if empty."""
135         val = self.get_int_or_none(key)
136         if val is None:
137             raise BadFormatException(f'unexpected empty value for: {key}')
138         return val
139
140     def get_int_or_none(self, key: str) -> int | None:
141         """Retrieve single/first value of key as int, return None if empty."""
142         val = self.get_str(key, ignore_strict=True)
143         if val == '':
144             return None
145         try:
146             return int(val)
147         except ValueError as e:
148             msg = f'cannot int form field value for key {key}: {val}'
149             raise BadFormatException(msg) from e
150
151     def get_float(self, key: str) -> float:
152         """Retrieve float value of key from self.postvars."""
153         val = self.get_str(key)
154         try:
155             return float(val)
156         except ValueError as e:
157             msg = f'cannot float form field value for key {key}: {val}'
158             raise BadFormatException(msg) from e
159
160     def get_float_or_none(self, key: str) -> float | None:
161         """Retrieve float value of key from self.postvars, None if empty."""
162         val = self.get_str(key)
163         if '' == val:
164             return None
165         try:
166             return float(val)
167         except ValueError as e:
168             msg = f'cannot float form field value for key {key}: {val}'
169             raise BadFormatException(msg) from e
170
171     def get_all_str(self, key: str) -> list[str]:
172         """Retrieve list of string values at key."""
173         if key not in self.inputs.keys():
174             return []
175         return self.inputs[key]
176
177     def get_all_int(self, key: str) -> list[int]:
178         """Retrieve list of int values at key."""
179         all_str = self.get_all_str(key)
180         try:
181             return [int(s) for s in all_str if len(s) > 0]
182         except ValueError as e:
183             msg = f'cannot int a form field value for key {key} in: {all_str}'
184             raise BadFormatException(msg) from e
185
186     def get_all_floats_or_nones(self, key: str) -> list[float | None]:
187         """Retrieve list of float value at key, None if empty strings."""
188         ret: list[float | None] = []
189         for val in self.get_all_str(key):
190             if '' == val:
191                 ret += [None]
192             else:
193                 try:
194                     ret += [float(val)]
195                 except ValueError as e:
196                     msg = f'cannot float form field value for key {key}: {val}'
197                     raise BadFormatException(msg) from e
198         return ret
199
200
201 class TaskHandler(BaseHTTPRequestHandler):
202     """Handles single HTTP request."""
203     # pylint: disable=too-many-public-methods
204     server: TaskServer
205     conn: DatabaseConnection
206     _site: str
207     _form_data: InputsParser
208     _params: InputsParser
209
210     def _send_page(self,
211                    ctx: dict[str, Any],
212                    tmpl_name: str,
213                    code: int = 200
214                    ) -> None:
215         """Send ctx as proper HTTP response."""
216         body = self.server.render(ctx, tmpl_name, self.conn)
217         self.send_response(code)
218         for header_tuple in self.server.headers:
219             self.send_header(*header_tuple)
220         self.end_headers()
221         self.wfile.write(bytes(body, 'utf-8'))
222
223     @staticmethod
224     def _request_wrapper(http_method: str, not_found_msg: str
225                          ) -> Callable[..., Callable[[TaskHandler], None]]:
226         """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
227
228         Among other things, conditionally cleans all caches, but only on POST
229         requests, as only those are expected to change the states of objects
230         that may be cached, and certainly only those are expected to write any
231         changes to the database. We want to call them as early though as
232         possible here, either exactly after the specific request handler
233         returns successfully, or right after any exception is triggered –
234         otherwise, race conditions become plausible.
235
236         Note that otherwise any POST attempt, even a failed one, may end in
237         problematic inconsistencies:
238
239         - if the POST handler experiences an Exception, changes to objects
240           won't get written to the DB, but the changed objects may remain in
241           the cache and affect other objects despite their possibly illegal
242           state
243
244         - even if an object was just saved to the DB, we cannot be sure its
245           current state is completely identical to what we'd get if loading it
246           fresh from the DB (e.g. currently Process.n_owners is only updated
247           when loaded anew via .from_table_row, nor is its state written to
248           the DB by .save; a questionable design choice, but proof that we
249           have no guarantee that objects' .save stores all their states we'd
250           prefer at their most up-to-date.
251         """
252
253         def clear_caches() -> None:
254             for cls in (Day, Todo, Condition, Process, ProcessStep):
255                 assert hasattr(cls, 'empty_cache')
256                 cls.empty_cache()
257
258         def decorator(f: Callable[..., str | None]
259                       ) -> Callable[[TaskHandler], None]:
260             def wrapper(self: TaskHandler) -> None:
261                 # pylint: disable=protected-access
262                 # (because pylint here fails to detect the use of wrapper as a
263                 # method to self with respective access privileges)
264                 try:
265                     self.conn = DatabaseConnection(self.server.db)
266                     parsed_url = urlparse(self.path)
267                     self._site = path_split(parsed_url.path)[1]
268                     params = parse_qs(parsed_url.query, strict_parsing=True)
269                     self._params = InputsParser(params, False)
270                     handler_name = f'do_{http_method}_{self._site}'
271                     if hasattr(self, handler_name):
272                         handler = getattr(self, handler_name)
273                         redir_target = f(self, handler)
274                         if 'POST' == http_method:
275                             clear_caches()
276                         if redir_target:
277                             self.send_response(302)
278                             self.send_header('Location', redir_target)
279                             self.end_headers()
280                     else:
281                         msg = f'{not_found_msg}: {self._site}'
282                         raise NotFoundException(msg)
283                 except HandledException as error:
284                     if 'POST' == http_method:
285                         clear_caches()
286                     ctx = {'msg': error}
287                     self._send_page(ctx, 'msg', error.http_code)
288                 finally:
289                     self.conn.close()
290             return wrapper
291         return decorator
292
293     @_request_wrapper('GET', 'Unknown page')
294     def do_GET(self, handler: Callable[[], str | dict[str, object]]
295                ) -> str | None:
296         """Render page with result of handler, or redirect if result is str."""
297         tmpl_name = f'{self._site}'
298         ctx_or_redir_target = handler()
299         if isinstance(ctx_or_redir_target, str):
300             return ctx_or_redir_target
301         self._send_page(ctx_or_redir_target, tmpl_name)
302         return None
303
304     @_request_wrapper('POST', 'Unknown POST target')
305     def do_POST(self, handler: Callable[[], str]) -> str:
306         """Handle POST with handler, prepare redirection to result."""
307         length = int(self.headers['content-length'])
308         postvars = parse_qs(self.rfile.read(length).decode(),
309                             keep_blank_values=True, strict_parsing=True)
310         self._form_data = InputsParser(postvars)
311         redir_target = handler()
312         self.conn.commit()
313         return redir_target
314
315     # GET handlers
316
317     @staticmethod
318     def _get_item(target_class: Any
319                   ) -> Callable[..., Callable[[TaskHandler],
320                                               dict[str, object]]]:
321         def decorator(f: Callable[..., dict[str, object]]
322                       ) -> Callable[[TaskHandler], dict[str, object]]:
323             def wrapper(self: TaskHandler) -> dict[str, object]:
324                 # pylint: disable=protected-access
325                 # (because pylint here fails to detect the use of wrapper as a
326                 # method to self with respective access privileges)
327                 id_ = self._params.get_int_or_none('id')
328                 if target_class.can_create_by_id:
329                     item = target_class.by_id_or_create(self.conn, id_)
330                 else:
331                     item = target_class.by_id(self.conn, id_)
332                 return f(self, item)
333             return wrapper
334         return decorator
335
336     def do_GET_(self) -> str:
337         """Return redirect target on GET /."""
338         return '/day'
339
340     def _do_GET_calendar(self) -> dict[str, object]:
341         """Show Days from ?start= to ?end=.
342
343         Both .do_GET_calendar and .do_GET_calendar_txt refer to this to do the
344         same, the only difference being the HTML template they are rendered to,
345         which .do_GET selects from their method name.
346         """
347         start, end = self._params.get_str('start'), self._params.get_str('end')
348         end = end if end else date_in_n_days(366)
349         days, start, end = Day.by_date_range_with_limits(self.conn,
350                                                          (start, end), 'id')
351         days = Day.with_filled_gaps(days, start, end)
352         today = date_in_n_days(0)
353         return {'start': start, 'end': end, 'days': days, 'today': today}
354
355     def do_GET_calendar(self) -> dict[str, object]:
356         """Show Days from ?start= to ?end= – normal view."""
357         return self._do_GET_calendar()
358
359     def do_GET_calendar_txt(self) -> dict[str, object]:
360         """Show Days from ?start= to ?end= – minimalist view."""
361         return self._do_GET_calendar()
362
363     def do_GET_day(self) -> dict[str, object]:
364         """Show single Day of ?date=."""
365         date = self._params.get_str('date', date_in_n_days(0))
366         day = Day.by_id_or_create(self.conn, date)
367         make_type = self._params.get_str('make_type')
368         conditions_present = []
369         enablers_for = {}
370         disablers_for = {}
371         for todo in day.todos:
372             for condition in todo.conditions + todo.blockers:
373                 if condition not in conditions_present:
374                     conditions_present += [condition]
375                     enablers_for[condition.id_] = [p for p in
376                                                    Process.all(self.conn)
377                                                    if condition in p.enables]
378                     disablers_for[condition.id_] = [p for p in
379                                                     Process.all(self.conn)
380                                                     if condition in p.disables]
381         seen_todos: set[int] = set()
382         top_nodes = [t.get_step_tree(seen_todos)
383                      for t in day.todos if not t.parents]
384         return {'day': day,
385                 'top_nodes': top_nodes,
386                 'make_type': make_type,
387                 'enablers_for': enablers_for,
388                 'disablers_for': disablers_for,
389                 'conditions_present': conditions_present,
390                 'processes': Process.all(self.conn)}
391
392     @_get_item(Todo)
393     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
394         """Show single Todo of ?id=."""
395
396         def walk_process_steps(id_: int,
397                                process_step_nodes: list[ProcessStepsNode],
398                                steps_nodes: list[TodoStepsNode]) -> None:
399             for process_step_node in process_step_nodes:
400                 id_ += 1
401                 node = TodoStepsNode(id_, None, process_step_node.process, [])
402                 steps_nodes += [node]
403                 walk_process_steps(id_, list(process_step_node.steps.values()),
404                                    node.children)
405
406         def walk_todo_steps(id_: int, todos: list[Todo],
407                             steps_nodes: list[TodoStepsNode]) -> None:
408             for todo in todos:
409                 matched = False
410                 for match in [item for item in steps_nodes
411                               if item.process
412                               and item.process == todo.process]:
413                     match.todo = todo
414                     matched = True
415                     for child in match.children:
416                         child.fillable = True
417                     walk_todo_steps(id_, todo.children, match.children)
418                 if not matched:
419                     id_ += 1
420                     node = TodoStepsNode(id_, todo, None, [])
421                     steps_nodes += [node]
422                     walk_todo_steps(id_, todo.children, node.children)
423
424         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
425                                     ) -> set[int]:
426             ids = set()
427             for node in steps_nodes:
428                 if not node.todo:
429                     assert isinstance(node.process, Process)
430                     assert isinstance(node.process.id_, int)
431                     ids.add(node.process.id_)
432                 ids = ids | collect_adoptables_keys(node.children)
433             return ids
434
435         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
436         process_tree = todo.process.get_steps(self.conn, None)
437         steps_todo_to_process: list[TodoStepsNode] = []
438         walk_process_steps(0, list(process_tree.values()),
439                            steps_todo_to_process)
440         for steps_node in steps_todo_to_process:
441             steps_node.fillable = True
442         walk_todo_steps(len(steps_todo_to_process), todo_steps,
443                         steps_todo_to_process)
444         adoptables: dict[int, list[Todo]] = {}
445         any_adoptables = [Todo.by_id(self.conn, t.id_)
446                           for t in Todo.by_date(self.conn, todo.date)
447                           if t.id_ is not None
448                           and t != todo]
449         for id_ in collect_adoptables_keys(steps_todo_to_process):
450             adoptables[id_] = [t for t in any_adoptables
451                                if t.process.id_ == id_]
452         return {'todo': todo, 'steps_todo_to_process': steps_todo_to_process,
453                 'adoption_candidates_for': adoptables,
454                 'process_candidates': Process.all(self.conn),
455                 'todo_candidates': any_adoptables,
456                 'condition_candidates': Condition.all(self.conn)}
457
458     def do_GET_todos(self) -> dict[str, object]:
459         """Show Todos from ?start= to ?end=, of ?process=, ?comment= pattern"""
460         sort_by = self._params.get_str('sort_by')
461         start = self._params.get_str('start')
462         end = self._params.get_str('end')
463         process_id = self._params.get_int_or_none('process_id')
464         comment_pattern = self._params.get_str('comment_pattern')
465         todos = []
466         ret = Todo.by_date_range_with_limits(self.conn, (start, end))
467         todos_by_date_range, start, end = ret
468         todos = [t for t in todos_by_date_range
469                  if comment_pattern in t.comment
470                  and ((not process_id) or t.process.id_ == process_id)]
471         sort_by = Todo.sort_by(todos, sort_by)
472         return {'start': start, 'end': end, 'process_id': process_id,
473                 'comment_pattern': comment_pattern, 'todos': todos,
474                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
475
476     def do_GET_conditions(self) -> dict[str, object]:
477         """Show all Conditions."""
478         pattern = self._params.get_str('pattern')
479         sort_by = self._params.get_str('sort_by')
480         conditions = Condition.matching(self.conn, pattern)
481         sort_by = Condition.sort_by(conditions, sort_by)
482         return {'conditions': conditions,
483                 'sort_by': sort_by,
484                 'pattern': pattern}
485
486     @_get_item(Condition)
487     def do_GET_condition(self, c: Condition) -> dict[str, object]:
488         """Show Condition of ?id=."""
489         ps = Process.all(self.conn)
490         return {'condition': c, 'is_new': c.id_ is None,
491                 'enabled_processes': [p for p in ps if c in p.conditions],
492                 'disabled_processes': [p for p in ps if c in p.blockers],
493                 'enabling_processes': [p for p in ps if c in p.enables],
494                 'disabling_processes': [p for p in ps if c in p.disables]}
495
496     @_get_item(Condition)
497     def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
498         """Show title history of Condition of ?id=."""
499         return {'condition': c}
500
501     @_get_item(Condition)
502     def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
503         """Show description historys of Condition of ?id=."""
504         return {'condition': c}
505
506     @_get_item(Process)
507     def do_GET_process(self, process: Process) -> dict[str, object]:
508         """Show Process of ?id=."""
509         owner_ids = self._params.get_all_int('step_to')
510         owned_ids = self._params.get_all_int('has_step')
511         title_64 = self._params.get_str('title_b64')
512         if title_64:
513             try:
514                 title = b64decode(title_64.encode()).decode()
515             except binascii_Exception as exc:
516                 msg = 'invalid base64 for ?title_b64='
517                 raise BadFormatException(msg) from exc
518             process.title.set(title)
519         preset_top_step = None
520         owners = process.used_as_step_by(self.conn)
521         for step_id in owner_ids:
522             owners += [Process.by_id(self.conn, step_id)]
523         for process_id in owned_ids:
524             Process.by_id(self.conn, process_id)  # to ensure ID exists
525             preset_top_step = process_id
526         return {'process': process, 'is_new': process.id_ is None,
527                 'preset_top_step': preset_top_step,
528                 'steps': process.get_steps(self.conn), 'owners': owners,
529                 'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
530                 'process_candidates': Process.all(self.conn),
531                 'condition_candidates': Condition.all(self.conn)}
532
533     @_get_item(Process)
534     def do_GET_process_titles(self, p: Process) -> dict[str, object]:
535         """Show title history of Process of ?id=."""
536         return {'process': p}
537
538     @_get_item(Process)
539     def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
540         """Show description historys of Process of ?id=."""
541         return {'process': p}
542
543     @_get_item(Process)
544     def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
545         """Show default effort history of Process of ?id=."""
546         return {'process': p}
547
548     def do_GET_processes(self) -> dict[str, object]:
549         """Show all Processes."""
550         pattern = self._params.get_str('pattern')
551         sort_by = self._params.get_str('sort_by')
552         processes = Process.matching(self.conn, pattern)
553         sort_by = Process.sort_by(processes, sort_by)
554         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
555
556     # POST handlers
557
558     @staticmethod
559     def _delete_or_post(target_class: Any, redir_target: str = '/'
560                         ) -> Callable[..., Callable[[TaskHandler], str]]:
561         def decorator(f: Callable[..., str]
562                       ) -> Callable[[TaskHandler], str]:
563             def wrapper(self: TaskHandler) -> str:
564                 # pylint: disable=protected-access
565                 # (because pylint here fails to detect the use of wrapper as a
566                 # method to self with respective access privileges)
567                 id_ = self._params.get_int_or_none('id')
568                 for _ in self._form_data.get_all_str('delete'):
569                     if id_ is None:
570                         msg = 'trying to delete non-saved ' +\
571                                 f'{target_class.__name__}'
572                         raise NotFoundException(msg)
573                     item = target_class.by_id(self.conn, id_)
574                     item.remove(self.conn)
575                     return redir_target
576                 if target_class.can_create_by_id:
577                     item = target_class.by_id_or_create(self.conn, id_)
578                 else:
579                     item = target_class.by_id(self.conn, id_)
580                 return f(self, item)
581             return wrapper
582         return decorator
583
584     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
585         """Update history timestamps for VersionedAttribute."""
586         id_ = self._params.get_int_or_none('id')
587         item = cls.by_id(self.conn, id_)
588         attr = getattr(item, attr_name)
589         for k, v in self._form_data.get_first_strings_starting('at:').items():
590             old = k[3:]
591             if old[19:] != v:
592                 attr.reset_timestamp(old, f'{v}.0')
593         attr.save(self.conn)
594         return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
595
596     def do_POST_day(self) -> str:
597         """Update or insert Day of date and Todos mapped to it."""
598         # pylint: disable=too-many-locals
599         try:
600             date = self._params.get_str('date')
601             day_comment = self._form_data.get_str('day_comment')
602             make_type = self._form_data.get_str('make_type')
603         except NotFoundException as e:
604             raise BadFormatException from e
605         old_todos = self._form_data.get_all_int('todo_id')
606         new_todos = self._form_data.get_all_int('new_todo')
607         comments = self._form_data.get_all_str('comment')
608         efforts = self._form_data.get_all_floats_or_nones('effort')
609         done_todos = self._form_data.get_all_int('done')
610         for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
611             raise BadFormatException('"done" field refers to unknown Todo')
612         is_done = [t_id in done_todos for t_id in old_todos]
613         if not (len(old_todos) == len(is_done) == len(comments)
614                 == len(efforts)):
615             msg = 'not equal number each of number of todo_id, comments, ' +\
616                     'and efforts inputs'
617             raise BadFormatException(msg)
618         day = Day.by_id_or_create(self.conn, date)
619         day.comment = day_comment
620         day.save(self.conn)
621         for process_id in sorted(new_todos):
622             if 'empty' == make_type:
623                 process = Process.by_id(self.conn, process_id)
624                 todo = Todo(None, process, False, date)
625                 todo.save(self.conn)
626             else:
627                 Todo.create_with_children(self.conn, process_id, date)
628         for i, todo_id in enumerate(old_todos):
629             todo = Todo.by_id(self.conn, todo_id)
630             todo.is_done = is_done[i]
631             todo.comment = comments[i]
632             todo.effort = efforts[i]
633             todo.save(self.conn)
634         return f'/day?date={date}&make_type={make_type}'
635
636     @_delete_or_post(Todo, '/')
637     def do_POST_todo(self, todo: Todo) -> str:
638         """Update Todo and its children."""
639         # pylint: disable=too-many-locals
640         # pylint: disable=too-many-branches
641         # pylint: disable=too-many-statements
642         adopted_child_ids = self._form_data.get_all_int('adopt')
643         processes_to_make_full = self._form_data.get_all_int('make_full')
644         processes_to_make_empty = self._form_data.get_all_int('make_empty')
645         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
646         with_effort_post = True
647         try:
648             effort = self._form_data.get_float_or_none('effort')
649         except NotFoundException:
650             with_effort_post = False
651         conditions = self._form_data.get_all_int('conditions')
652         disables = self._form_data.get_all_int('disables')
653         blockers = self._form_data.get_all_int('blockers')
654         enables = self._form_data.get_all_int('enables')
655         is_done = len(self._form_data.get_all_str('done')) > 0
656         calendarize = len(self._form_data.get_all_str('calendarize')) > 0
657         comment = self._form_data.get_str('comment', ignore_strict=True)
658         for v in fill_fors.values():
659             target_id: int
660             for prefix in ['make_empty_', 'make_full_']:
661                 if v.startswith(prefix):
662                     try:
663                         target_id = int(v[len(prefix):])
664                     except ValueError as e:
665                         msg = 'bad fill_for target: {v}'
666                         raise BadFormatException(msg) from e
667                     continue
668             if v.startswith('make_empty_'):
669                 processes_to_make_empty += [target_id]
670             elif v.startswith('make_full_'):
671                 processes_to_make_full += [target_id]
672             elif v != 'ignore':
673                 adopted_child_ids += [int(v)]
674         to_remove = []
675         for child in todo.children:
676             assert isinstance(child.id_, int)
677             if child.id_ not in adopted_child_ids:
678                 to_remove += [child.id_]
679         for id_ in to_remove:
680             child = Todo.by_id(self.conn, id_)
681             todo.remove_child(child)
682         for child_id in adopted_child_ids:
683             if child_id in [c.id_ for c in todo.children]:
684                 continue
685             child = Todo.by_id(self.conn, child_id)
686             todo.add_child(child)
687         for process_id in processes_to_make_empty:
688             process = Process.by_id(self.conn, process_id)
689             made = Todo(None, process, False, todo.date)
690             made.save(self.conn)
691             todo.add_child(made)
692         for process_id in processes_to_make_full:
693             made = Todo.create_with_children(self.conn, process_id, todo.date)
694             todo.add_child(made)
695         if with_effort_post:
696             todo.effort = effort
697         todo.set_conditions(self.conn, conditions)
698         todo.set_blockers(self.conn, blockers)
699         todo.set_enables(self.conn, enables)
700         todo.set_disables(self.conn, disables)
701         todo.is_done = is_done
702         todo.calendarize = calendarize
703         todo.comment = comment
704         todo.save(self.conn)
705         return f'/todo?id={todo.id_}'
706
707     def do_POST_process_descriptions(self) -> str:
708         """Update history timestamps for Process.description."""
709         return self._change_versioned_timestamps(Process, 'description')
710
711     def do_POST_process_efforts(self) -> str:
712         """Update history timestamps for Process.effort."""
713         return self._change_versioned_timestamps(Process, 'effort')
714
715     def do_POST_process_titles(self) -> str:
716         """Update history timestamps for Process.title."""
717         return self._change_versioned_timestamps(Process, 'title')
718
719     @_delete_or_post(Process, '/processes')
720     def do_POST_process(self, process: Process) -> str:
721         """Update or insert Process of ?id= and fields defined in postvars."""
722         # pylint: disable=too-many-locals
723         # pylint: disable=too-many-statements
724         try:
725             title = self._form_data.get_str('title')
726             description = self._form_data.get_str('description')
727             effort = self._form_data.get_float('effort')
728         except NotFoundException as e:
729             raise BadFormatException from e
730         conditions = self._form_data.get_all_int('conditions')
731         blockers = self._form_data.get_all_int('blockers')
732         enables = self._form_data.get_all_int('enables')
733         disables = self._form_data.get_all_int('disables')
734         calendarize = self._form_data.get_all_str('calendarize') != []
735         suppresses = self._form_data.get_all_int('suppresses')
736         step_of = self._form_data.get_all_str('step_of')
737         keep_steps = self._form_data.get_all_int('keep_step')
738         step_ids = self._form_data.get_all_int('steps')
739         new_top_steps = self._form_data.get_all_str('new_top_step')
740         step_process_id_to = {}
741         step_parent_id_to = {}
742         new_steps_to = {}
743         for step_id in step_ids:
744             name = f'new_step_to_{step_id}'
745             new_steps_to[step_id] = self._form_data.get_all_int(name)
746         for step_id in keep_steps:
747             name = f'step_{step_id}_process_id'
748             step_process_id_to[step_id] = self._form_data.get_int(name)
749             name = f'step_{step_id}_parent_id'
750             step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
751         process.title.set(title)
752         process.description.set(description)
753         process.effort.set(effort)
754         process.set_conditions(self.conn, conditions)
755         process.set_blockers(self.conn, blockers)
756         process.set_enables(self.conn, enables)
757         process.set_disables(self.conn, disables)
758         process.calendarize = calendarize
759         process.save(self.conn)
760         assert isinstance(process.id_, int)
761         new_step_title = None
762         steps: list[ProcessStep] = []
763         for step_id in keep_steps:
764             if step_id not in step_ids:
765                 raise BadFormatException('trying to keep unknown step')
766             step = ProcessStep(step_id, process.id_,
767                                step_process_id_to[step_id],
768                                step_parent_id_to[step_id])
769             steps += [step]
770         for step_id in step_ids:
771             new = [ProcessStep(None, process.id_, step_process_id, step_id)
772                    for step_process_id in new_steps_to[step_id]]
773             steps += new
774         for step_identifier in new_top_steps:
775             try:
776                 step_process_id = int(step_identifier)
777                 step = ProcessStep(None, process.id_, step_process_id, None)
778                 steps += [step]
779             except ValueError:
780                 new_step_title = step_identifier
781         process.set_steps(self.conn, steps)
782         process.set_step_suppressions(self.conn, suppresses)
783         owners_to_set = []
784         new_owner_title = None
785         for owner_identifier in step_of:
786             try:
787                 owners_to_set += [int(owner_identifier)]
788             except ValueError:
789                 new_owner_title = owner_identifier
790         process.set_owners(self.conn, owners_to_set)
791         params = f'id={process.id_}'
792         if new_step_title:
793             title_b64_encoded = b64encode(new_step_title.encode()).decode()
794             params = f'step_to={process.id_}&title_b64={title_b64_encoded}'
795         elif new_owner_title:
796             title_b64_encoded = b64encode(new_owner_title.encode()).decode()
797             params = f'has_step={process.id_}&title_b64={title_b64_encoded}'
798         process.save(self.conn)
799         return f'/process?{params}'
800
801     def do_POST_condition_descriptions(self) -> str:
802         """Update history timestamps for Condition.description."""
803         return self._change_versioned_timestamps(Condition, 'description')
804
805     def do_POST_condition_titles(self) -> str:
806         """Update history timestamps for Condition.title."""
807         return self._change_versioned_timestamps(Condition, 'title')
808
809     @_delete_or_post(Condition, '/conditions')
810     def do_POST_condition(self, condition: Condition) -> str:
811         """Update/insert Condition of ?id= and fields defined in postvars."""
812         try:
813             is_active = self._form_data.get_str('is_active') == 'True'
814             title = self._form_data.get_str('title')
815             description = self._form_data.get_str('description')
816         except NotFoundException as e:
817             raise BadFormatException(e) from e
818         condition.is_active = is_active
819         condition.title.set(title)
820         condition.description.set(description)
821         condition.save(self.conn)
822         return f'/condition?id={condition.id_}'