"""Collecting Day and date-related items."""
+from __future__ import annotations
from datetime import datetime, timedelta
from sqlite3 import Row
from plomtask.misc import HandledException
DATE_FORMAT = '%Y-%m-%d'
-def date_valid(date: str):
- """Validate date against DATE_FORMAT, return Datetime or None."""
+def valid_date(date_str: str) -> str:
+ """Validate date against DATE_FORMAT or 'today', return in DATE_FORMAT."""
+ if date_str == 'today':
+ date_str = todays_date()
try:
- result = datetime.strptime(date, DATE_FORMAT)
- except (ValueError, TypeError):
- return None
- return result
+ dt = datetime.strptime(date_str, DATE_FORMAT)
+ except (ValueError, TypeError) as e:
+ msg = f'Given date of wrong format: {date_str}'
+ raise HandledException(msg) from e
+ return dt.strftime(DATE_FORMAT)
-def todays_date():
+def todays_date() -> str:
"""Return current date in DATE_FORMAT."""
return datetime.now().strftime(DATE_FORMAT)
class Day:
"""Individual days defined by their dates."""
- def __init__(self, date: str, comment: str = ''):
- self.date = date
- self.datetime = date_valid(self.date)
- if not self.datetime:
- raise HandledException(f'Given date of wrong format: {self.date}')
+ def __init__(self, date: str, comment: str = '') -> None:
+ self.date = valid_date(date)
+ self.datetime = datetime.strptime(self.date, DATE_FORMAT)
self.comment = comment
- def __eq__(self, other: object):
+ def __eq__(self, other: object) -> bool:
return isinstance(other, self.__class__) and self.date == other.date
- def __lt__(self, other):
+ def __lt__(self, other: Day) -> bool:
return self.date < other.date
@classmethod
- def from_table_row(cls, row: Row):
+ def from_table_row(cls, row: Row) -> Day:
"""Make Day from database row."""
return cls(row[0], row[1])
@classmethod
def all(cls, db_conn: DatabaseConnection,
- date_range: tuple[str, str] = ('', ''), fill_gaps: bool = False):
+ date_range: tuple[str, str] = ('', ''),
+ fill_gaps: bool = False) -> list[Day]:
"""Return list of Days in database within date_range."""
- def date_from_range_str(date_str: str, default: str):
- if date_str == '':
- date_str = default
- elif date_str == 'today':
- date_str = todays_date()
- elif not date_valid(date_str):
- raise HandledException(f'Bad date: {date_str}')
- return date_str
- start_date = date_from_range_str(date_range[0], '2024-01-01')
- end_date = date_from_range_str(date_range[1], '2030-01-01')
+ min_date = '2024-01-01'
+ max_date = '2030-12-31'
+ start_date = valid_date(date_range[0] if date_range[0] else min_date)
+ end_date = valid_date(date_range[1] if date_range[1] else max_date)
days = []
sql = 'SELECT * FROM days WHERE date >= ? AND date <= ?'
for row in db_conn.exec(sql, (start_date, end_date)):
@classmethod
def by_date(cls, db_conn: DatabaseConnection,
- date: str, create: bool = False):
+ date: str, create: bool = False) -> Day:
"""Retrieve Day by date if in DB, else return None."""
for row in db_conn.exec('SELECT * FROM days WHERE date = ?', (date,)):
return cls.from_table_row(row)
- if create:
- return cls(date)
- return None
+ if not create:
+ raise HandledException(f'Day not found for date: {date}')
+ return cls(date)
@property
- def weekday(self):
+ def weekday(self) -> str:
"""Return what weekday matches self.date."""
return self.datetime.strftime('%A')
@property
- def prev_date(self):
+ def prev_date(self) -> str:
"""Return date preceding date of this Day."""
prev_datetime = self.datetime - timedelta(days=1)
return prev_datetime.strftime(DATE_FORMAT)
@property
- def next_date(self):
+ def next_date(self) -> str:
"""Return date succeeding date of this Day."""
next_datetime = self.datetime + timedelta(days=1)
return next_datetime.strftime(DATE_FORMAT)
- def save(self, db_conn: DatabaseConnection):
+ def save(self, db_conn: DatabaseConnection) -> None:
"""Add (or re-write) self to database."""
db_conn.exec('REPLACE INTO days VALUES (?, ?)',
(self.date, self.comment))
"""Database management."""
from os.path import isfile
from difflib import Differ
-from sqlite3 import connect as sql_connect
+from sqlite3 import connect as sql_connect, Cursor
+from typing import Any
from plomtask.misc import HandledException
PATH_DB_SCHEMA = 'scripts/init.sql'
class DatabaseFile: # pylint: disable=too-few-public-methods
"""Represents the sqlite3 database's file."""
- def __init__(self, path):
+ def __init__(self, path: str) -> None:
self.path = path
self._check()
- def remake(self):
+ def remake(self) -> None:
"""Create tables in self.path file as per PATH_DB_SCHEMA sql file."""
with sql_connect(self.path) as conn:
with open(PATH_DB_SCHEMA, 'r', encoding='utf-8') as f:
conn.executescript(f.read())
self._check()
- def _check(self):
+ def _check(self) -> None:
"""Check file exists and is of proper schema."""
self.exists = isfile(self.path)
if self.exists:
self._validate_schema()
- def _validate_schema(self):
+ def _validate_schema(self) -> None:
"""Compare found schema with what's stored at PATH_DB_SCHEMA."""
sql_for_schema = 'SELECT sql FROM sqlite_master ORDER BY sql'
msg_err = 'Database has wrong tables schema. Diff:\n'
class DatabaseConnection:
"""A single connection to the database."""
- def __init__(self, db_file: DatabaseFile):
+ def __init__(self, db_file: DatabaseFile) -> None:
self.file = db_file
self.conn = sql_connect(self.file.path)
- def commit(self):
+ def commit(self) -> None:
"""Commit SQL transaction."""
self.conn.commit()
- def exec(self, code: str, inputs: tuple = ()):
+ def exec(self, code: str, inputs: tuple[Any, ...] = tuple()) -> Cursor:
"""Add commands to SQL transaction."""
return self.conn.execute(code, inputs)
- def close(self):
+ def close(self) -> None:
"""Close DB connection."""
self.conn.close()
"""Web server stuff."""
+from typing import Any
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
from urllib.parse import urlparse, parse_qs
from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
from plomtask.days import Day, todays_date
from plomtask.misc import HandledException
-from plomtask.db import DatabaseConnection
+from plomtask.db import DatabaseConnection, DatabaseFile
from plomtask.processes import Process
TEMPLATES_DIR = 'templates'
class TaskServer(HTTPServer):
"""Variant of HTTPServer that knows .jinja as Jinja Environment."""
- def __init__(self, db_file, *args, **kwargs):
+ def __init__(self, db_file: DatabaseFile,
+ *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.db = db_file
self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
"""Handles single HTTP request."""
server: TaskServer
- def do_GET(self):
+ def do_GET(self) -> None:
"""Handle any GET request."""
try:
conn, site, params = self._init_handling()
elif 'process' == site:
id_ = params.get('id', [None])[0]
try:
- id_ = int(id_) if id_ else None
+ id__ = int(id_) if id_ else None
except ValueError as e:
raise HandledException(f'Bad ?id= value: {id_}') from e
- html = self.do_GET_process(conn, id_)
+ html = self.do_GET_process(conn, id__)
elif 'processes' == site:
html = self.do_GET_processes(conn)
else:
except HandledException as error:
self._send_msg(error)
- def do_GET_calendar(self, conn: DatabaseConnection, start: str, end: str):
+ def do_GET_calendar(self, conn: DatabaseConnection,
+ start: str, end: str) -> str:
"""Show Days."""
days = Day.all(conn, date_range=(start, end), fill_gaps=True)
return self.server.jinja.get_template('calendar.html').render(
days=days, start=start, end=end)
- def do_GET_day(self, conn: DatabaseConnection, date: str):
+ def do_GET_day(self, conn: DatabaseConnection, date: str) -> str:
"""Show single Day."""
day = Day.by_date(conn, date, create=True)
return self.server.jinja.get_template('day.html').render(day=day)
- def do_GET_process(self, conn: DatabaseConnection, id_: int | None):
+ def do_GET_process(self, conn: DatabaseConnection, id_: int | None) -> str:
"""Show process of id_."""
return self.server.jinja.get_template('process.html').render(
process=Process.by_id(conn, id_, create=True))
- def do_GET_processes(self, conn: DatabaseConnection):
+ def do_GET_processes(self, conn: DatabaseConnection) -> str:
"""Show all Processes."""
return self.server.jinja.get_template('processes.html').render(
processes=Process.all(conn))
- def do_POST(self):
+ def do_POST(self) -> None:
"""Handle any POST request."""
try:
conn, site, params = self._init_handling()
length = int(self.headers['content-length'])
postvars = parse_qs(self.rfile.read(length).decode(),
- keep_blank_values=1)
+ keep_blank_values=True)
if 'day' == site:
- date = params.get('date', [None])[0]
+ date = params.get('date', [''])[0]
self.do_POST_day(conn, date, postvars)
elif 'process' == site:
- id_ = params.get('id', [None])[0]
+ id_ = params.get('id', [''])[0]
try:
- id_ = int(id_) if id_ else None
+ id__ = int(id_) if id_ else None
except ValueError as e:
raise HandledException(f'Bad ?id= value: {id_}') from e
- self.do_POST_process(conn, id_, postvars)
+ self.do_POST_process(conn, id__, postvars)
conn.commit()
conn.close()
self._redirect('/')
except HandledException as error:
self._send_msg(error)
- def do_POST_day(self, conn: DatabaseConnection, date: str, postvars: dict):
+ def do_POST_day(self, conn: DatabaseConnection,
+ date: str, postvars: dict[str, list[str]]) -> None:
"""Update or insert Day of date and fields defined in postvars."""
day = Day.by_date(conn, date, create=True)
day.comment = postvars['comment'][0]
day.save(conn)
def do_POST_process(self, conn: DatabaseConnection, id_: int | None,
- postvars: dict):
+ postvars: dict[str, list[str]]) -> None:
"""Update or insert Process of id_ and fields defined in postvars."""
process = Process.by_id(conn, id_, create=True)
- if process:
- process.title.set(postvars['title'][0])
- process.description.set(postvars['description'][0])
- effort = postvars['effort'][0]
- try:
- process.effort.set(float(effort))
- except ValueError as e:
- raise HandledException(f'Bad effort value: {effort}') from e
- process.save(conn)
-
- def _init_handling(self):
+ process.title.set(postvars['title'][0])
+ process.description.set(postvars['description'][0])
+ effort = postvars['effort'][0]
+ try:
+ process.effort.set(float(effort))
+ except ValueError as e:
+ raise HandledException(f'Bad effort value: {effort}') from e
+ process.save(conn)
+
+ def _init_handling(self) -> \
+ tuple[DatabaseConnection, str, dict[str, list[str]]]:
conn = DatabaseConnection(self.server.db)
parsed_url = urlparse(self.path)
site = path_split(parsed_url.path)[1]
params = parse_qs(parsed_url.query)
return conn, site, params
- def _redirect(self, target: str):
+ def _redirect(self, target: str) -> None:
self.send_response(302)
self.send_header('Location', target)
self.end_headers()
- def _send_html(self, html: str, code: int = 200):
+ def _send_html(self, html: str, code: int = 200) -> None:
"""Send HTML as proper HTTP response."""
self.send_response(code)
self.end_headers()
self.wfile.write(bytes(html, 'utf-8'))
- def _send_msg(self, msg: str, code: int = 400):
+ def _send_msg(self, msg: Exception, code: int = 400) -> None:
"""Send message in HTML formatting as HTTP response."""
html = self.server.jinja.get_template('msg.html').render(msg=msg)
self._send_html(html, code)
from sqlite3 import Row
from datetime import datetime
from plomtask.db import DatabaseConnection
+from plomtask.misc import HandledException
class Process:
return list(processes.values())
@classmethod
- def by_id(cls, db_conn: DatabaseConnection,
- id_: int | None, create: bool = False) -> Process | None:
+ def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
+ create: bool = False) -> Process:
"""Collect all Processes and their connected VersionedAttributes."""
process = None
for row in db_conn.exec('SELECT * FROM processes '
'WHERE id = ?', (id_,)):
process = cls(row[0])
break
- if create and not process:
+ if not process:
+ if not create:
+ raise HandledException(f'Process not found of id: {id_}')
process = Process(id_)
if process:
for row in db_conn.exec('SELECT * FROM process_titles '
set -e
for dir in $(echo '.' 'plomtask' 'tests'); do
echo "Running mypy on ${dir}/ …."
- python3 -m mypy ${dir}/*.py
+ python3 -m mypy --strict ${dir}/*.py
echo "Running flake8 on ${dir}/ …"
python3 -m flake8 ${dir}/*.py
echo "Running pylint on ${dir}/ …"
"""Test Days module."""
from unittest import TestCase
-from http.client import HTTPConnection
from datetime import datetime
from tests.utils import TestCaseWithDB, TestCaseWithServer
from plomtask.days import Day, todays_date
class TestsSansDB(TestCase):
"""Days module tests not requiring DB setup."""
- def test_Day_dates(self):
+ def test_Day_dates(self) -> None:
"""Test Day's date format."""
with self.assertRaises(HandledException):
Day('foo')
- with self.assertRaises(HandledException):
- Day(None)
- with self.assertRaises(HandledException):
- Day(3)
with self.assertRaises(HandledException):
Day('2024-02-30')
with self.assertRaises(HandledException):
Day('2024-02-01 23:00:00')
self.assertEqual(datetime(2024, 1, 1), Day('2024-01-01').datetime)
- def test_Day_sorting(self):
+ def test_Day_sorting(self) -> None:
"""Test Day.__lt__."""
day1 = Day('2024-01-01')
day2 = Day('2024-01-02')
days = [day3, day1, day2]
self.assertEqual(sorted(days), [day1, day2, day3])
- def test_Day_weekday(self):
+ def test_Day_weekday(self) -> None:
"""Test Day.weekday."""
self.assertEqual(Day('2024-03-17').weekday, 'Sunday')
class TestsWithDB(TestCaseWithDB):
"""Days module tests not requiring DB setup."""
- def test_Day_by_date(self):
+ def test_Day_by_date(self) -> None:
"""Test Day.by_date()."""
- self.assertEqual(None, Day.by_date(self.db_conn, '2024-01-01'))
+ with self.assertRaises(HandledException):
+ Day.by_date(self.db_conn, '2024-01-01')
Day('2024-01-01').save(self.db_conn)
self.assertEqual(Day('2024-01-01'),
Day.by_date(self.db_conn, '2024-01-01'))
- self.assertEqual(None,
- Day.by_date(self.db_conn, '2024-01-02'))
+ with self.assertRaises(HandledException):
+ Day.by_date(self.db_conn, '2024-01-02')
self.assertEqual(Day('2024-01-02'),
Day.by_date(self.db_conn, '2024-01-02', create=True))
- def test_Day_all(self):
+ def test_Day_all(self) -> None:
"""Test Day.all(), especially in regards to date range filtering."""
day1 = Day('2024-01-01')
day2 = Day('2024-01-02')
day1.save(self.db_conn)
day2.save(self.db_conn)
day3.save(self.db_conn)
- self.assertEqual(Day.all(self.db_conn),
- [day1, day2, day3])
+ self.assertEqual(Day.all(self.db_conn), [day1, day2, day3])
self.assertEqual(Day.all(self.db_conn, ('', '')),
[day1, day2, day3])
self.assertEqual(Day.all(self.db_conn, ('2024-01-01', '2024-01-03')),
today.save(self.db_conn)
self.assertEqual(Day.all(self.db_conn, ('today', 'today')), [today])
- def test_Day_neighbor_dates(self):
+ def test_Day_neighbor_dates(self) -> None:
"""Test Day.prev_date and Day.next_date."""
self.assertEqual(Day('2024-01-01').prev_date, '2023-12-31')
self.assertEqual(Day('2023-02-28').next_date, '2023-03-01')
class TestsWithServer(TestCaseWithServer):
"""Tests against our HTTP server/handler (and database)."""
- def test_do_GET(self):
+ def test_do_GET(self) -> None:
"""Test /day and /calendar response codes."""
- http_conn = HTTPConnection(*self.httpd.server_address)
- http_conn.request('GET', '/day')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/day?date=3000-01-01')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/day?date=FOO')
- self.assertEqual(http_conn.getresponse().status, 400)
- http_conn.request('GET', '/calendar')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/calendar?start=&end=')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/calendar?start=today&end=today')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/calendar?start=2024-01-01&end=2025-01-01')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/calendar?start=foo')
- self.assertEqual(http_conn.getresponse().status, 400)
+ self.conn.request('GET', '/day')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/day?date=3000-01-01')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/day?date=FOO')
+ self.assertEqual(self.conn.getresponse().status, 400)
+ self.conn.request('GET', '/calendar')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/calendar?start=&end=')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/calendar?start=today&end=today')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/calendar?start=2024-01-01&end=2025-01-01')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/calendar?start=foo')
+ self.assertEqual(self.conn.getresponse().status, 400)
"""Test Processes module."""
from unittest import TestCase
-from http.client import HTTPConnection
from urllib.parse import urlencode
from tests.utils import TestCaseWithDB, TestCaseWithServer
from plomtask.processes import Process
+from plomtask.misc import HandledException
class TestsSansDB(TestCase):
"""Module tests not requiring DB setup."""
- def test_Process_versioned_defaults(self):
+ def test_Process_versioned_defaults(self) -> None:
"""Test defaults of Process' VersionedAttributes."""
self.assertEqual(Process(None).title.newest, 'UNNAMED')
self.assertEqual(Process(None).description.newest, '')
class TestsWithDB(TestCaseWithDB):
"""Mdule tests not requiring DB setup."""
- def test_Process_save(self):
+ def test_Process_save(self) -> None:
"""Test Process.save()."""
p_saved = Process(None)
p_saved.save(self.db_conn)
p_loaded = Process.by_id(self.db_conn, p_saved.id_)
self.assertEqual(p_saved.title.history, p_loaded.title.history)
- def test_Process_by_id(self):
+ def test_Process_by_id(self) -> None:
"""Test Process.by_id()."""
- self.assertEqual(None, Process.by_id(self.db_conn, None, create=False))
- self.assertEqual(None, Process.by_id(self.db_conn, 0, create=False))
- self.assertEqual(None, Process.by_id(self.db_conn, 1, create=False))
+ with self.assertRaises(HandledException):
+ Process.by_id(self.db_conn, None, create=False)
+ with self.assertRaises(HandledException):
+ Process.by_id(self.db_conn, 0, create=False)
+ with self.assertRaises(HandledException):
+ Process.by_id(self.db_conn, 1, create=False)
self.assertNotEqual(Process(1).id_,
Process.by_id(self.db_conn, None, create=True).id_)
self.assertNotEqual(Process(1).id_,
self.assertEqual(Process(2).id_,
Process.by_id(self.db_conn, 2, create=True).id_)
- def test_Process_all(self):
+ def test_Process_all(self) -> None:
"""Test Process.all()."""
p_1 = Process(None)
p_1.save(self.db_conn)
class TestsWithServer(TestCaseWithServer):
"""Module tests against our HTTP server/handler (and database)."""
- def test_do_POST_process(self):
+ def test_do_POST_process(self) -> None:
"""Test POST /process and its effect on the database."""
- def post_data_to_expect(form_data: dict, to_: str, expect: int):
+ def post_data_to_expect(form_data: dict[str, object],
+ to_: str, expect: int) -> None:
encoded_form_data = urlencode(form_data).encode('utf-8')
headers = {'Content-Type': 'application/x-www-form-urlencoded',
'Content-Length': str(len(encoded_form_data))}
- http_conn.request('POST', to_,
+ self.conn.request('POST', to_,
body=encoded_form_data, headers=headers)
- self.assertEqual(http_conn.getresponse().status, expect)
- http_conn = HTTPConnection(*self.httpd.server_address)
+ self.assertEqual(self.conn.getresponse().status, expect)
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
post_data_to_expect(form_data, '/process?id=FOO', 400)
form_data['effort'] = 'foo'
self.assertEqual([p.id_ for p in Process.all(self.db_conn)],
[retrieved.id_])
- def test_do_GET(self):
+ def test_do_GET(self) -> None:
"""Test /process and /processes response codes."""
- http_conn = HTTPConnection(*self.httpd.server_address)
- http_conn.request('GET', '/process')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/process?id=')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/process?id=0')
- self.assertEqual(http_conn.getresponse().status, 200)
- http_conn.request('GET', '/process?id=FOO')
- self.assertEqual(http_conn.getresponse().status, 400)
- http_conn.request('GET', '/processes')
- self.assertEqual(http_conn.getresponse().status, 200)
+ self.conn.request('GET', '/process')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/process?id=')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/process?id=0')
+ self.assertEqual(self.conn.getresponse().status, 200)
+ self.conn.request('GET', '/process?id=FOO')
+ self.assertEqual(self.conn.getresponse().status, 400)
+ self.conn.request('GET', '/processes')
+ self.assertEqual(self.conn.getresponse().status, 200)
"""Shared test utilities."""
from unittest import TestCase
from threading import Thread
+from http.client import HTTPConnection
from datetime import datetime
from os import remove as remove_file
from plomtask.db import DatabaseFile, DatabaseConnection
class TestCaseWithDB(TestCase):
"""Module tests not requiring DB setup."""
- def setUp(self):
+ def setUp(self) -> None:
timestamp = datetime.now().timestamp()
self.db_file = DatabaseFile(f'test_db:{timestamp}')
self.db_file.remake()
self.db_conn = DatabaseConnection(self.db_file)
- def tearDown(self):
+ def tearDown(self) -> None:
self.db_conn.close()
remove_file(self.db_file.path)
class TestCaseWithServer(TestCaseWithDB):
"""Module tests against our HTTP server/handler (and database)."""
- def setUp(self):
+ def setUp(self) -> None:
super().setUp()
self.httpd = TaskServer(self.db_file, ('localhost', 0), TaskHandler)
self.server_thread = Thread(target=self.httpd.serve_forever)
self.server_thread.daemon = True
self.server_thread.start()
+ self.conn = HTTPConnection(str(self.httpd.server_address[0]),
+ self.httpd.server_address[1])
- def tearDown(self):
+ def tearDown(self) -> None:
self.httpd.shutdown()
self.httpd.server_close()
self.server_thread.join()