89 lines
2.6 KiB
Python
89 lines
2.6 KiB
Python
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import re
|
||
import sqlite3
|
||
from pathlib import Path
|
||
from datetime import datetime
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
DB_PATH = BASE_DIR / "hellas.db"
|
||
|
||
|
||
def now_iso() -> str:
|
||
return datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
|
||
|
||
def ensure_db(conn: sqlite3.Connection) -> None:
|
||
conn.executescript(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS items (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
artikel TEXT NOT NULL,
|
||
groesse TEXT NOT NULL,
|
||
preis REAL NOT NULL DEFAULT 0,
|
||
soll INTEGER NOT NULL DEFAULT 0,
|
||
gezaehlt INTEGER NOT NULL DEFAULT 0,
|
||
verkaeufe INTEGER NOT NULL DEFAULT 0,
|
||
created_at TEXT NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
);
|
||
"""
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def extract_data(html_text: str) -> list[dict]:
|
||
# Liest den const DATA = [...] Block aus der alten HTML‑Datei.
|
||
match = re.search(r"const\s+DATA\s*=\s*(\[[\s\S]*?\]);", html_text)
|
||
if not match:
|
||
raise ValueError("DATA-Block nicht gefunden.")
|
||
raw = match.group(1)
|
||
return json.loads(raw)
|
||
|
||
|
||
def main() -> int:
|
||
parser = argparse.ArgumentParser(description="Import aus hellas_bestand.html in SQLite.")
|
||
parser.add_argument("html", type=Path, help="Pfad zur hellas_bestand.html")
|
||
parser.add_argument("--truncate", action="store_true", help="Vor Import alle Items löschen.")
|
||
args = parser.parse_args()
|
||
|
||
html_text = args.html.read_text(encoding="utf-8")
|
||
data = extract_data(html_text)
|
||
|
||
conn = sqlite3.connect(DB_PATH)
|
||
conn.row_factory = sqlite3.Row
|
||
ensure_db(conn)
|
||
|
||
if args.truncate:
|
||
conn.execute("DELETE FROM items")
|
||
conn.commit()
|
||
|
||
now = now_iso()
|
||
insert_sql = """
|
||
INSERT INTO items (artikel, groesse, soll, gezaehlt, verkaeufe, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
"""
|
||
|
||
rows_added = 0
|
||
for item in data:
|
||
artikel = (item.get("artikel") or "").strip()
|
||
for row in item.get("rows") or []:
|
||
groesse = (row.get("groesse") or "").strip()
|
||
soll = int(row.get("soll") or 0)
|
||
gezaehlt = int(row.get("gezaehlt") or 0)
|
||
verkaeufe = int(row.get("verkaeufe") or 0)
|
||
if not artikel or not groesse:
|
||
continue
|
||
conn.execute(insert_sql, (artikel, groesse, soll, gezaehlt, verkaeufe, now, now))
|
||
rows_added += 1
|
||
|
||
conn.commit()
|
||
print(f"Import fertig. Zeilen eingefügt: {rows_added}")
|
||
return 0
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main())
|