From 743dbe0d493ddeb47eca981fa5be6d78e4d754c9 Mon Sep 17 00:00:00 2001
From: Christian Heller <c.heller@plomlompom.de>
Date: Wed, 15 Jan 2025 15:10:20 +0100
Subject: [PATCH 1/3] First commit.

---
 db.py | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 192 insertions(+)
 create mode 100644 db.py

diff --git a/db.py b/db.py
new file mode 100644
index 0000000..e13ac14
--- /dev/null
+++ b/db.py
@@ -0,0 +1,192 @@
+"""Database management."""
+from difflib import Differ
+from pathlib import Path
+from sqlite3 import connect as sql_connect, Cursor as DbCursor
+from typing import Any, Callable, Literal, Optional, Self, TypeVar
+from abc import ABC, abstractmethod
+
+
+_SQL_DB_VERSION = 'PRAGMA user_version'
+TypePlomDbMigration = TypeVar('TypePlomDbMigration', bound='PlomDbMigration')
+TypePlomDbFile = TypeVar('TypePlomDbFile', bound='PlomDbFile')
+
+
+class PlomDbException(Exception):
+    """Collects 1) a terse machine-readable name, 2) human-friendly message."""
+
+    def __init__(self, name: str, *args: Any, msg: str = '', **kwargs: Any
+                 ) -> None:
+        super().__init__(*args, **kwargs)
+        self.name = name
+        self.msg = msg
+
+
+class PlomDbFile:
+    """File readable as DB of expected schema, user version."""
+    indent_n: int = 4
+    target_version: int
+    path_schema: Path
+    default_path: Path
+    mig_class: type['PlomDbMigration']
+
+    def __init__(self,
+                 path: Optional[Path] = None,
+                 skip_validations: bool = False
+                 ) -> None:
+        self.path = path if path else self.default_path
+        if not self.path.is_file():
+            raise PlomDbException('no_is_file', f'no DB file at {self.path}')
+        if skip_validations:
+            return
+        if (user_version := self._get_user_version()) != self.target_version:
+            raise PlomDbException(
+                'bad_version',
+                f'wrong DB version {user_version} (!= {self.target_version})')
+        with PlomDbConn(self) as conn:
+            self._validate_schema(conn)
+
+    @classmethod
+    def _validate_schema(cls, conn: 'PlomDbConn') -> None:
+        sch_rows_normed = []
+        indent = cls.indent_n * ' '
+        for row in [
+                r[0] for r in conn.exec(
+                    'SELECT sql FROM sqlite_master ORDER BY sql')
+                if r[0]]:
+            row_normed = []
+            for subrow in [sr.rstrip() for sr in row.split('\n')]:
+                in_parentheses = 0
+                split_at = []
+                for i, c in enumerate(subrow):
+                    if '(' == c:
+                        in_parentheses += 1
+                    elif ')' == c:
+                        in_parentheses -= 1
+                    elif ',' == c and 0 == in_parentheses:
+                        split_at += [i + 1]
+                prev_split = 0
+                for i in split_at:
+                    if segment := subrow[prev_split:i].strip():
+                        row_normed += [f'{indent}{segment}']
+                    prev_split = i
+                if segment := subrow[prev_split:].strip():
+                    row_normed += [f'{indent}{segment}']
+            row_normed[0] = row_normed[0].lstrip()  # no indent for opening …
+            row_normed[-1] = row_normed[-1].lstrip()  # … and closing line
+            if row_normed[-1] != ')' and row_normed[-3][-1] != ',':
+                row_normed[-3] = row_normed[-3] + ','
+                row_normed[-2:] = [indent + row_normed[-1][:-1]] + [')']
+            row_normed[-1] = row_normed[-1] + ';'
+            sch_rows_normed += row_normed
+        expected_rows =\
+            cls.path_schema.read_text(encoding='utf8').rstrip().splitlines()
+        if expected_rows != sch_rows_normed:
+            raise PlomDbException(
+               'bad_schema',
+               'Unexpected tables schema. Diff to {cls.path_schema}:\n'
+               + '\n'.join(Differ().compare(sch_rows_normed, expected_rows)))
+
+    def _get_user_version(self) -> int:
+        with sql_connect(self.path) as conn:
+            val = list(conn.execute(_SQL_DB_VERSION))[0][0]
+            assert isinstance(val, int)
+            return val
+
+    @classmethod
+    def create(cls, path_db: Optional[Path] = None) -> None:
+        """Create DB file at path_db according to file at self.path_schema.."""
+        path_db = path_db if path_db else cls.default_path
+        if path_db.exists():
+            raise PlomDbException('no_create_path_exists',
+                                  f'There already exists a node at {path_db}.')
+        if not path_db.parent.is_dir():
+            raise PlomDbException(
+                    'no_create_no_dir',
+                    f'No directory {path_db.parent} found to write into.')
+        with sql_connect(path_db) as conn:
+            conn.executescript(cls.path_schema.read_text(encoding='utf8'))
+            conn.execute(f'{_SQL_DB_VERSION} = {cls.target_version}')
+
+    def migrate(self, migrations: set[TypePlomDbMigration]) -> None:
+        """Migrate towards .target_version, following migrations."""
+        from_version = self._get_user_version()
+        if from_version >= self.target_version:
+            raise PlomDbException(
+                    'no_migrate_path',
+                    f'No migrating {from_version} to {self.target_version}.')
+        with PlomDbConn(self) as conn:
+            for migration in self.mig_class.gather(from_version, migrations):
+                migration.perform(conn)
+            self._validate_schema(conn)
+            conn.commit()
+
+
+class PlomDbConn:
+    """SQL connection to PlomDbFile."""
+    default_path: Path
+
+    def __init__(self, db_file: Optional[TypePlomDbFile] = None) -> None:
+        self._conn = sql_connect(
+                db_file.path if db_file else self.default_path,
+                autocommit=False)
+        # additional sqlite3.Connection shortcuts beyond .exec
+        self.exec_script = self._conn.executescript
+        self.commit = self._conn.commit
+
+    def __enter__(self) -> Self:  # context manager entry
+        return self
+
+    def __exit__(self, *_: Any) -> Literal[False]:  # context manager exit
+        self._conn.close()
+        return False
+
+    def exec(self,
+             sql: str,
+             inputs: tuple[Any, ...] = tuple(),
+             build_q_marks: bool = True
+             ) -> DbCursor:
+        """Wraps sqlite3.Connection.execute, appends (!) len(inputs) '?'s."""
+        if len(inputs) > 0:
+            if build_q_marks:
+                q_marks = ('?' if len(inputs) == 1
+                           else '(' + ','.join(['?'] * len(inputs)) + ')')
+                return self._conn.execute(f'{sql} {q_marks}', inputs)
+            return self._conn.execute(sql, inputs)
+        return self._conn.execute(sql)
+
+
+class PlomDbMigration(ABC):
+    """Collects and enacts PlomDbFile migration commands."""
+    migs_dir_path: Path = Path()
+
+    def __init__(self,
+                 target_version: int,
+                 sql_path: Optional[Path] = None,
+                 post_sql_steps: Optional[Callable] = None
+                 ) -> None:
+        if sql_path:
+            start_tok = sql_path.name.split('_', maxsplit=1)[0]
+            if (not start_tok.isdigit()) or int(start_tok) != target_version:
+                raise PlomDbException(
+                    'no_migrate_bad_path',
+                    f'bad path {sql_path} for migration to {target_version}')
+        self.target_version = target_version
+        self._sql_path = sql_path
+        self._post_sql_steps = post_sql_steps
+
+    def perform(self, conn: PlomDbConn) -> None:
+        """Do ._sql_path script and ._post_sql_steps, set .target_version."""
+        if self._sql_path:
+            sql_path = self.__class__.migs_dir_path.joinpath(self._sql_path)
+            conn.exec_script(sql_path.read_text(encoding='utf8'))
+        if self._post_sql_steps:
+            self._post_sql_steps(conn)
+        conn.exec(f'{_SQL_DB_VERSION} = {self.target_version}')
+
+    @classmethod
+    @abstractmethod
+    def gather(cls,
+               from_version: int,
+               base_set: set[TypePlomDbMigration]
+               ) -> list[TypePlomDbMigration]:
+        """Return sorted list of migrations to perform."""
-- 
2.30.2


From e7202fcfd78c6a60bd90da789a68c8ec4baf7b1a Mon Sep 17 00:00:00 2001
From: Christian Heller <c.heller@plomlompom.de>
Date: Sat, 18 Jan 2025 03:25:29 +0100
Subject: [PATCH 2/3] Add web stuff.

---
 web.py | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 107 insertions(+)
 create mode 100644 web.py

diff --git a/web.py b/web.py
new file mode 100644
index 0000000..629e63b
--- /dev/null
+++ b/web.py
@@ -0,0 +1,107 @@
+"""HTTP services."""
+from http.server import HTTPServer, BaseHTTPRequestHandler
+from json import loads as json_loads
+from pathlib import Path
+from typing import Any, Generic, Optional, Type, TypeVar
+from urllib.parse import parse_qs, urlparse
+from jinja2 import (
+        Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader)
+
+
+MIME_APP_JSON = 'application/json'
+
+
+Server = TypeVar('Server', bound='PlomHttpServer')
+QueryMap = TypeVar('QueryMap', bound='PlomQueryMap')
+
+
+class PlomHttpServer(HTTPServer):
+    """Basic HTTP server."""
+
+    def __init__(self, templates_dir: Path, *args, **kwargs) -> None:
+        super().__init__(*args, **kwargs)
+        self.jinja = JinjaEnv(loader=JinjaFSLoader(templates_dir))
+
+    def serve(self) -> None:
+        """Do .serve_forever on .server_port/.server_address until ^C."""
+        print(f'running at port {self.server_port}')
+        try:
+            self.serve_forever()
+        except KeyboardInterrupt:
+            print('aborted due to keyboard interrupt')
+        self.server_close()
+
+
+class PlomHttpHandler(BaseHTTPRequestHandler, Generic[Server, QueryMap]):
+    """Basic HTTP request handler for use with PlomHttpServer."""
+    server: Server
+    params: QueryMap
+    path_toks: tuple[str, ...]
+    postvars: QueryMap
+    mapper: Type[QueryMap]
+    pagename: str
+
+    def parse_request(self) -> bool:
+        """Extend by setup of .params, .path_toks."""
+        if not super().parse_request():
+            return False
+        url = urlparse(self.path)
+        self.params = self.mapper(url.query)
+        self.path_toks = Path(url.path).parts
+        self.pagename = self.path_toks[1] if len(self.path_toks) > 1 else ''
+        self.postvars = self.mapper(
+                self.rfile.read(int(self.headers.get('content-length', 0))
+                                ).decode('utf8'),
+                self.headers['content-type'])
+        return True
+
+    def send_http(self,
+                  content: bytes = b'',
+                  headers: Optional[list[tuple[str, str]]] = None,
+                  code: int = 200
+                  ) -> None:
+        """Send HTTP response."""
+        headers = headers if headers else []
+        self.send_response(code)
+        for header in headers:
+            self.send_header(*header)
+        self.end_headers()
+        if content:
+            self.wfile.write(content)
+
+    def send_rendered(self,
+                      tmpl_name: Path,
+                      ctx: dict[str, Any],
+                      code: int = 200
+                      ) -> None:
+        """Send rendered page of tmpl_name template, with ctx data."""
+        tmpl = self.server.jinja.get_template(str(tmpl_name))
+        self.send_http(bytes(tmpl.render(**ctx), encoding='utf8'), code=code)
+
+    def redirect(self, target: Path) -> None:
+        """Redirect browser to target."""
+        self.send_http(headers=[('Location', str(target))], code=302)
+
+
+class PlomQueryMap:
+    """Convenience wrapper over somewhat dict-shaped HTTP inputs."""
+
+    def __init__(self, serialized: str, content_type: str = '') -> None:
+        if content_type == MIME_APP_JSON:
+            self.as_dict = json_loads(serialized)
+        else:
+            self.as_dict = parse_qs(serialized, keep_blank_values=True,
+                                    strict_parsing=True, errors='strict')
+
+    def all(self, key: str) -> Optional[list[str]]:
+        """Return all values mapped to key, None if key not present."""
+        return self.as_dict.get(key, None)
+
+    def first(self, key: str) -> Optional[str]:
+        """Return first value mapped to key, if found, else None."""
+        values = self.all(key)
+        return values[0] if values else None
+
+    def keys_prefixed(self, prefix: str) -> tuple[str, ...]:
+        """Return all present keys starting with prefix."""
+        return tuple(k for k in self.as_dict if k.startswith(prefix))
-- 
2.30.2


From dee7c0f6218e6bdd07b477dc5d9e4b5540ffcf4a Mon Sep 17 00:00:00 2001
From: Christian Heller <c.heller@plomlompom.de>
Date: Sun, 26 Jan 2025 11:20:12 +0100
Subject: [PATCH 3/3] Enable autoescape for web server's Jinja environment.

---
 web.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/web.py b/web.py
index 629e63b..813d0fc 100644
--- a/web.py
+++ b/web.py
@@ -20,7 +20,8 @@ class PlomHttpServer(HTTPServer):
 
     def __init__(self, templates_dir: Path, *args, **kwargs) -> None:
         super().__init__(*args, **kwargs)
-        self.jinja = JinjaEnv(loader=JinjaFSLoader(templates_dir))
+        self.jinja = JinjaEnv(loader=JinjaFSLoader(templates_dir),
+                              autoescape=True)
 
     def serve(self) -> None:
         """Do .serve_forever on .server_port/.server_address until ^C."""
-- 
2.30.2