home · contact · privacy
Remove unnecessary DB commit.
[plomtask] / plomtask / http.py
1 """Web server stuff."""
2 from typing import Any
3 from http.server import BaseHTTPRequestHandler
4 from http.server import HTTPServer
5 from urllib.parse import urlparse, parse_qs
6 from os.path import split as path_split
7 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
8 from plomtask.days import Day, todays_date
9 from plomtask.exceptions import HandledException, BadFormatException, \
10         NotFoundException
11 from plomtask.db import DatabaseConnection, DatabaseFile
12 from plomtask.processes import Process
13
14 TEMPLATES_DIR = 'templates'
15
16
17 class TaskServer(HTTPServer):
18     """Variant of HTTPServer that knows .jinja as Jinja Environment."""
19
20     def __init__(self, db_file: DatabaseFile,
21                  *args: Any, **kwargs: Any) -> None:
22         super().__init__(*args, **kwargs)
23         self.db = db_file
24         self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
25
26
27 class TaskHandler(BaseHTTPRequestHandler):
28     """Handles single HTTP request."""
29     server: TaskServer
30
31     def do_GET(self) -> None:
32         """Handle any GET request."""
33         try:
34             conn, site, params = self._init_handling()
35             if 'calendar' == site:
36                 start = params.get('start', [''])[0]
37                 end = params.get('end', [''])[0]
38                 html = self.do_GET_calendar(conn, start, end)
39             elif 'day' == site:
40                 date = params.get('date', [todays_date()])[0]
41                 html = self.do_GET_day(conn, date)
42             elif 'process' == site:
43                 id_ = params.get('id', [None])[0]
44                 try:
45                     id__ = int(id_) if id_ else None
46                 except ValueError as e:
47                     raise BadFormatException(f'Bad ?id= value: {id_}') from e
48                 html = self.do_GET_process(conn, id__)
49             elif 'processes' == site:
50                 html = self.do_GET_processes(conn)
51             elif '' == site:
52                 self._redirect('/day')
53                 return
54             else:
55                 raise NotFoundException(f'Unknown page: /{site}')
56             conn.close()
57             self._send_html(html)
58         except HandledException as error:
59             self._send_msg(error, code=error.http_code)
60
61     def do_GET_calendar(self, conn: DatabaseConnection,
62                         start: str, end: str) -> str:
63         """Show Days."""
64         days = Day.all(conn, date_range=(start, end), fill_gaps=True)
65         return self.server.jinja.get_template('calendar.html').render(
66                 days=days, start=start, end=end)
67
68     def do_GET_day(self, conn: DatabaseConnection, date: str) -> str:
69         """Show single Day."""
70         day = Day.by_date(conn, date, create=True)
71         return self.server.jinja.get_template('day.html').render(day=day)
72
73     def do_GET_process(self, conn: DatabaseConnection, id_: int | None) -> str:
74         """Show process of id_."""
75         return self.server.jinja.get_template('process.html').render(
76                 process=Process.by_id(conn, id_, create=True))
77
78     def do_GET_processes(self, conn: DatabaseConnection) -> str:
79         """Show all Processes."""
80         return self.server.jinja.get_template('processes.html').render(
81                 processes=Process.all(conn))
82
83     def do_POST(self) -> None:
84         """Handle any POST request."""
85         try:
86             conn, site, params = self._init_handling()
87             length = int(self.headers['content-length'])
88             postvars = parse_qs(self.rfile.read(length).decode(),
89                                 keep_blank_values=True)
90             if 'day' == site:
91                 date = params.get('date', [''])[0]
92                 self.do_POST_day(conn, date, postvars)
93             elif 'process' == site:
94                 id_ = params.get('id', [''])[0]
95                 try:
96                     id__ = int(id_) if id_ else None
97                 except ValueError as e:
98                     raise BadFormatException(f'Bad ?id= value: {id_}') from e
99                 self.do_POST_process(conn, id__, postvars)
100             conn.commit()
101             conn.close()
102             self._redirect('/')
103         except HandledException as error:
104             self._send_msg(error, code=error.http_code)
105
106     def do_POST_day(self, conn: DatabaseConnection,
107                     date: str, postvars: dict[str, list[str]]) -> None:
108         """Update or insert Day of date and fields defined in postvars."""
109         day = Day.by_date(conn, date, create=True)
110         day.comment = postvars['comment'][0]
111         day.save(conn)
112
113     def do_POST_process(self, conn: DatabaseConnection, id_: int | None,
114                         postvars: dict[str, list[str]]) -> None:
115         """Update or insert Process of id_ and fields defined in postvars."""
116         process = Process.by_id(conn, id_, create=True)
117         process.title.set(postvars['title'][0])
118         process.description.set(postvars['description'][0])
119         effort = postvars['effort'][0]
120         try:
121             process.effort.set(float(effort))
122         except ValueError as e:
123             raise BadFormatException(f'Bad effort value: {effort}') from e
124         process.save(conn)
125
126     def _init_handling(self) -> \
127             tuple[DatabaseConnection, str, dict[str, list[str]]]:
128         conn = DatabaseConnection(self.server.db)
129         parsed_url = urlparse(self.path)
130         site = path_split(parsed_url.path)[1]
131         params = parse_qs(parsed_url.query)
132         return conn, site, params
133
134     def _redirect(self, target: str) -> None:
135         self.send_response(302)
136         self.send_header('Location', target)
137         self.end_headers()
138
139     def _send_html(self, html: str, code: int = 200) -> None:
140         """Send HTML as proper HTTP response."""
141         self.send_response(code)
142         self.end_headers()
143         self.wfile.write(bytes(html, 'utf-8'))
144
145     def _send_msg(self, msg: Exception, code: int = 400) -> None:
146         """Send message in HTML formatting as HTTP response."""
147         html = self.server.jinja.get_template('msg.html').render(msg=msg)
148         self._send_html(html, code)