from __future__ import annotations import os import sqlite3 import secrets import smtplib from uuid import uuid4 from email.message import EmailMessage from functools import wraps from pathlib import Path from datetime import datetime from typing import Any from flask import Flask, Blueprint, g, flash, jsonify, redirect, render_template, request, session, url_for from werkzeug.security import check_password_hash, generate_password_hash from werkzeug.utils import secure_filename BASE_DIR = Path(__file__).resolve().parent DB_PATH = BASE_DIR / "hellas.db" UPLOAD_DIR = BASE_DIR / "static" / "uploads" ALLOWED_EXT = {".png", ".jpg", ".jpeg", ".webp", ".gif"} # Optional Prefix, z. B. /wawi, wenn die App hinter einem Sub‑Pfad läuft. URL_PREFIX = os.environ.get("URL_PREFIX", "").strip().rstrip("/") STATIC_URL_PATH = f"{URL_PREFIX}/static" if URL_PREFIX else "/static" app = Flask(__name__, static_url_path=STATIC_URL_PATH) # Session‑Secret für Login‑Cookies (in Produktion unbedingt setzen). app.secret_key = os.environ.get("SECRET_KEY", "change-me") app.config["SESSION_COOKIE_SAMESITE"] = "Lax" app.config["SESSION_COOKIE_SECURE"] = False app.config["MAX_CONTENT_LENGTH"] = 5 * 1024 * 1024 bp = Blueprint("bp", __name__) UPLOAD_DIR.mkdir(parents=True, exist_ok=True) def get_db() -> sqlite3.Connection: if "db" not in g: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row g.db = conn return g.db @app.teardown_appcontext def close_db(exc: Exception | None) -> None: db = g.pop("db", None) if db is not None: db.close() def init_db() -> None: db = get_db() db.executescript( """ CREATE TABLE IF NOT EXISTS items ( id INTEGER PRIMARY KEY AUTOINCREMENT, artikel TEXT NOT NULL, groesse TEXT NOT NULL, preis REAL NOT NULL DEFAULT 0, bild_url TEXT, soll INTEGER NOT NULL DEFAULT 0, gezaehlt INTEGER NOT NULL DEFAULT 0, verkaeufe INTEGER NOT NULL DEFAULT 0, created_at TEXT NOT NULL, updated_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS ausbuchungen ( id INTEGER PRIMARY KEY AUTOINCREMENT, item_id INTEGER NOT NULL, menge INTEGER NOT NULL, grund TEXT, created_at TEXT NOT NULL, FOREIGN KEY(item_id) REFERENCES items(id) ); CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT NOT NULL UNIQUE, password_hash TEXT NOT NULL, created_at TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS orders ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, handy TEXT NOT NULL, mannschaft TEXT NOT NULL, artikel TEXT NOT NULL, groesse TEXT NOT NULL, menge INTEGER NOT NULL, notiz TEXT, created_at TEXT NOT NULL, done INTEGER NOT NULL DEFAULT 0, completed_by TEXT, completed_at TEXT, canceled INTEGER NOT NULL DEFAULT 0, canceled_by TEXT, canceled_at TEXT ); """ ) db.commit() ensure_price_column(db) ensure_image_column(db) ensure_orders_columns(db) ensure_admin_user(db) def ensure_price_column(db: sqlite3.Connection) -> None: # Fügt die Preisspalte nachträglich hinzu, falls DB älter ist. cols = db.execute("PRAGMA table_info(items)").fetchall() if any(c["name"] == "preis" for c in cols): return db.execute("ALTER TABLE items ADD COLUMN preis REAL NOT NULL DEFAULT 0") db.commit() def ensure_image_column(db: sqlite3.Connection) -> None: cols = db.execute("PRAGMA table_info(items)").fetchall() if any(c["name"] == "bild_url" for c in cols): return db.execute("ALTER TABLE items ADD COLUMN bild_url TEXT") db.commit() def ensure_orders_columns(db: sqlite3.Connection) -> None: cols = db.execute("PRAGMA table_info(orders)").fetchall() names = {c["name"] for c in cols} if "done" not in names: db.execute("ALTER TABLE orders ADD COLUMN done INTEGER NOT NULL DEFAULT 0") if "completed_by" not in names: db.execute("ALTER TABLE orders ADD COLUMN completed_by TEXT") if "completed_at" not in names: db.execute("ALTER TABLE orders ADD COLUMN completed_at TEXT") if "canceled" not in names: db.execute("ALTER TABLE orders ADD COLUMN canceled INTEGER NOT NULL DEFAULT 0") if "canceled_by" not in names: db.execute("ALTER TABLE orders ADD COLUMN canceled_by TEXT") if "canceled_at" not in names: db.execute("ALTER TABLE orders ADD COLUMN canceled_at TEXT") db.commit() @app.before_request def ensure_db() -> None: # Erstellt Tabellen, wenn sie fehlen (bei jedem Request idempotent). init_db() def now_iso() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M") def save_upload(file) -> str | None: if not file or not getattr(file, "filename", ""): return None ext = os.path.splitext(file.filename)[1].lower() if ext not in ALLOWED_EXT: return None name = secure_filename(Path(file.filename).stem) or "image" safe_name = f"{name}-{uuid4().hex}{ext}" dest = UPLOAD_DIR / safe_name file.save(dest) return f"{URL_PREFIX}/static/uploads/{safe_name}" if URL_PREFIX else f"/static/uploads/{safe_name}" def ensure_admin_user(db: sqlite3.Connection) -> None: # Legt einen Admin‑User an, wenn noch kein Benutzer existiert. row = db.execute("SELECT COUNT(*) AS c FROM users").fetchone() if row and row["c"] > 0: return user = os.environ.get("APP_USER", "admin") password = os.environ.get("APP_PASSWORD", "admin") db.execute( """ INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?) """, (user, generate_password_hash(password), now_iso()), ) db.commit() def login_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("user"): return redirect(url_for("bp.login", next=request.path)) return fn(*args, **kwargs) return wrapper @app.context_processor def inject_auth(): return {"logged_in": bool(session.get("user"))} def api_key_required(fn): # Schützt API‑Endpoints per X-API-Key oder ?key= Parameter. @wraps(fn) def wrapper(*args, **kwargs): expected = os.environ.get("APP_API_KEY", "") if not expected: return jsonify({"error": "API key not configured"}), 500 provided = request.headers.get("X-API-Key") or request.args.get("key") or "" if provided != expected: return jsonify({"error": "Unauthorized"}), 401 return fn(*args, **kwargs) return wrapper @bp.route("/") @login_required def index(): q = (request.args.get("q") or "").strip() sort = (request.args.get("sort") or "gezaehlt").strip().lower() direction = (request.args.get("dir") or "desc").strip().lower() allowed = {"artikel", "groesse", "soll", "gezaehlt", "verkaeufe"} if sort not in allowed: sort = "gezaehlt" if direction not in {"asc", "desc"}: direction = "desc" params: list[Any] = [] where = "" if q: where = "WHERE artikel LIKE ? OR groesse LIKE ?" like = f"%{q}%" params.extend([like, like]) sql = f""" SELECT * FROM items {where} ORDER BY {sort} {direction}, artikel ASC, groesse ASC """ rows = get_db().execute(sql, params).fetchall() grouped = {} for r in rows: key = r["artikel"] or "" grouped.setdefault(key, []).append(r) groups = [{"artikel": k, "rows": v} for k, v in grouped.items()] total = get_db().execute("SELECT COUNT(*) AS c FROM items").fetchone()["c"] total_bestand = get_db().execute("SELECT COALESCE(SUM(gezaehlt), 0) AS s FROM items").fetchone()["s"] open_orders = get_db().execute("SELECT COUNT(*) AS c FROM orders WHERE done = 0 AND canceled = 0").fetchone()["c"] return render_template( "index.html", groups=groups, q=q, sort=sort, direction=direction, total=total, total_bestand=total_bestand, open_orders=open_orders, ) @bp.route("/new", methods=["GET", "POST"]) @login_required def new_item(): if request.method == "POST": artikel = (request.form.get("artikel") or "").strip() groesse = (request.form.get("groesse") or "").strip() preis = float(request.form.get("preis") or 0) bild_url = (request.form.get("bild_url") or "").strip() uploaded = save_upload(request.files.get("bild_file")) if uploaded: bild_url = uploaded soll = int(request.form.get("soll") or 0) gezaehlt = int(request.form.get("gezaehlt") or 0) verkaeufe = int(request.form.get("verkaeufe") or 0) if artikel and groesse: db = get_db() db.execute( """ INSERT INTO items (artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, now_iso(), now_iso()), ) db.execute("UPDATE items SET preis = ?, bild_url = ? WHERE artikel = ?", (preis, bild_url, artikel)) db.commit() return redirect(url_for("bp.index")) return render_template("edit.html", item=None) @bp.route("/edit/", methods=["GET", "POST"]) @login_required def edit_item(item_id: int): db = get_db() item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone() if item is None: return redirect(url_for("bp.index")) if request.method == "POST": artikel = (request.form.get("artikel") or "").strip() groesse = (request.form.get("groesse") or "").strip() preis = float(request.form.get("preis") or 0) bild_url = (request.form.get("bild_url") or "").strip() uploaded = save_upload(request.files.get("bild_file")) if uploaded: bild_url = uploaded soll = int(request.form.get("soll") or 0) gezaehlt = int(request.form.get("gezaehlt") or 0) verkaeufe = int(request.form.get("verkaeufe") or 0) db.execute( """ UPDATE items SET artikel = ?, groesse = ?, preis = ?, bild_url = ?, soll = ?, gezaehlt = ?, verkaeufe = ?, updated_at = ? WHERE id = ? """, (artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, now_iso(), item_id), ) db.execute("UPDATE items SET preis = ?, bild_url = ? WHERE artikel = ?", (preis, bild_url, artikel)) db.commit() return redirect(url_for("bp.index")) return render_template("edit.html", item=item) @bp.route("/delete/", methods=["POST"]) @login_required def delete_item(item_id: int): db = get_db() db.execute("DELETE FROM items WHERE id = ?", (item_id,)) db.commit() return redirect(url_for("bp.index")) @bp.route("/ausbuchen/", methods=["GET", "POST"]) @login_required def ausbuchen(item_id: int): db = get_db() item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone() if item is None: return redirect(url_for("bp.index")) if request.method == "POST": menge = int(request.form.get("menge") or 0) grund = (request.form.get("grund") or "").strip() or None if menge > 0: neue_gezaehlt = max(int(item["gezaehlt"]) - menge, 0) neue_verkaeufe = int(item["verkaeufe"]) + menge db.execute( """ UPDATE items SET gezaehlt = ?, verkaeufe = ?, updated_at = ? WHERE id = ? """, (neue_gezaehlt, neue_verkaeufe, now_iso(), item_id), ) db.execute( """ INSERT INTO ausbuchungen (item_id, menge, grund, created_at) VALUES (?, ?, ?, ?) """, (item_id, menge, grund, now_iso()), ) db.commit() return redirect(url_for("bp.index")) return render_template("ausbuchen.html", item=item) @bp.route("/verkauf/", methods=["POST"]) @login_required def verkauf(item_id: int): db = get_db() item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone() if item is None: return redirect(url_for("bp.index")) menge = 1 neue_gezaehlt = max(int(item["gezaehlt"]) - menge, 0) neue_verkaeufe = int(item["verkaeufe"]) + menge db.execute( """ UPDATE items SET gezaehlt = ?, verkaeufe = ?, updated_at = ? WHERE id = ? """, (neue_gezaehlt, neue_verkaeufe, now_iso(), item_id), ) db.execute( """ INSERT INTO ausbuchungen (item_id, menge, grund, created_at) VALUES (?, ?, ?, ?) """, (item_id, menge, "Verkauf", now_iso()), ) db.commit() return redirect(url_for("bp.index")) @bp.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": user = (request.form.get("user") or "").strip() password = request.form.get("password") or "" row = get_db().execute( "SELECT id, username, password_hash FROM users WHERE username = ?", (user,), ).fetchone() if row and check_password_hash(row["password_hash"], password): session["user"] = user nxt = request.args.get("next") or url_for("bp.index") return redirect(nxt) return render_template("login.html", error=True) return render_template("login.html", error=False) @bp.route("/logout") def logout(): session.clear() return redirect(url_for("bp.login")) @bp.route("/users", methods=["GET", "POST"]) @login_required def users(): db = get_db() error = None if request.method == "POST": username = (request.form.get("username") or "").strip() password = request.form.get("password") or "" if not username or not password: error = "Benutzer und Passwort sind erforderlich." else: try: db.execute( """ INSERT INTO users (username, password_hash, created_at) VALUES (?, ?, ?) """, (username, generate_password_hash(password), now_iso()), ) db.commit() except sqlite3.IntegrityError: error = "Benutzername existiert bereits." rows = db.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall() return render_template("users.html", rows=rows, error=error) @bp.route("/users/delete/", methods=["POST"]) @login_required def delete_user(user_id: int): db = get_db() count = db.execute("SELECT COUNT(*) AS c FROM users").fetchone()["c"] if count <= 1: return redirect(url_for("bp.users")) db.execute("DELETE FROM users WHERE id = ?", (user_id,)) db.commit() return redirect(url_for("bp.users")) @bp.route("/api/bestand", methods=["GET"]) @api_key_required def api_bestand(): return jsonify(build_bestand()) @bp.route("/proxy/bestand", methods=["GET"]) def proxy_bestand(): # Server‑Proxy ohne API‑Key (z. B. für öffentliche Anzeige). return jsonify(build_bestand()) def build_bestand() -> list[dict]: # Aggregiert DB‑Zeilen in die Struktur der Live‑Bestand Ansicht. rows = get_db().execute( """ SELECT artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe FROM items ORDER BY artikel, groesse """ ).fetchall() data: dict[str, dict] = {} for r in rows: artikel = (r["artikel"] or "").strip() if not artikel: continue item = data.setdefault( artikel, {"artikel": artikel, "preis": 0, "bild_url": "", "rows": [], "totals": {"soll": 0, "gezaehlt": 0, "abweichung": 0, "fehlbestand": 0, "verkaeufe": 0}}, ) if not item["preis"]: item["preis"] = float(r["preis"] or 0) if not item["bild_url"]: item["bild_url"] = (r["bild_url"] or "").strip() soll = int(r["soll"] or 0) gezaehlt = int(r["gezaehlt"] or 0) verkaeufe = int(r["verkaeufe"] or 0) abw = gezaehlt - soll fehl = max(soll - gezaehlt, 0) item["rows"].append( { "groesse": r["groesse"], "soll": soll, "gezaehlt": gezaehlt, "abweichung": abw, "fehlbestand": fehl if fehl > 0 else None, "verkaeufe": verkaeufe, } ) t = item["totals"] t["soll"] += soll t["gezaehlt"] += gezaehlt t["abweichung"] += abw t["fehlbestand"] += fehl t["verkaeufe"] += verkaeufe result: list[dict] = [] for artikel, item in data.items(): t = item["totals"] if t["fehlbestand"] == 0: t["fehlbestand"] = None result.append(item) return result @bp.route("/order", methods=["POST"]) def order(): data = request.get_json(silent=True) or request.form required = ["name", "handy", "mannschaft", "artikel", "groesse", "menge"] if any(not (data.get(k) or "").strip() for k in required): return jsonify({"error": "Pflichtfelder fehlen."}), 400 db = get_db() db.execute( """ INSERT INTO orders (name, handy, mannschaft, artikel, groesse, menge, notiz, created_at, done, completed_by, completed_at, canceled, canceled_by, canceled_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, NULL, NULL, 0, NULL, NULL) """, ( data.get("name"), data.get("handy"), data.get("mannschaft"), data.get("artikel"), data.get("groesse"), int(data.get("menge") or 0), data.get("notiz"), now_iso(), ), ) db.commit() to_addr = os.environ.get("ORDER_TO", "bjoern@welker.me") smtp_host = os.environ.get("SMTP_HOST") smtp_user = os.environ.get("SMTP_USER") smtp_pass = os.environ.get("SMTP_PASS") smtp_port = int(os.environ.get("SMTP_PORT", "587")) smtp_from = os.environ.get("SMTP_FROM", smtp_user or "no-reply@localhost") if not smtp_host or not smtp_user or not smtp_pass: return jsonify({"error": "Mailversand nicht konfiguriert."}), 500 msg = EmailMessage() msg["Subject"] = "Neue Bestellung (Hellas Bestand)" msg["From"] = smtp_from recipients = [a.strip() for a in to_addr.split(",") if a.strip()] msg["To"] = ", ".join(recipients) body = ( "Neue Bestellung:\n" f"Name: {data.get('name')}\n" f"Handy: {data.get('handy')}\n" f"Mannschaft: {data.get('mannschaft')}\n" f"Artikel: {data.get('artikel')}\n" f"Größe: {data.get('groesse')}\n" f"Menge: {data.get('menge')}\n" f"Notiz: {data.get('notiz') or '-'}\n" "WaWi: https://hellas.welker.me/wawi\n" ) msg.set_content(body) with smtplib.SMTP(smtp_host, smtp_port) as server: server.starttls() server.login(smtp_user, smtp_pass) server.send_message(msg, to_addrs=recipients) return jsonify({"ok": True}) @bp.route("/orders") @login_required def orders(): rows = get_db().execute( """ SELECT id, name, handy, mannschaft, artikel, groesse, menge, notiz, created_at, done, completed_by, completed_at, canceled, canceled_by, canceled_at FROM orders ORDER BY id DESC LIMIT 500 """ ).fetchall() return render_template("orders.html", rows=rows) @bp.route("/orders/complete/", methods=["POST"]) @login_required def complete_order(order_id: int): user = session.get("user") or "unknown" db = get_db() order = db.execute( """ SELECT id, artikel, groesse, menge, done, canceled FROM orders WHERE id = ? """, (order_id,), ).fetchone() if order is None: return redirect(url_for("bp.orders")) if not order["done"] and not order["canceled"]: menge = int(order["menge"] or 0) item = db.execute( """ SELECT gezaehlt FROM items WHERE artikel = ? AND groesse = ? """, (order["artikel"], order["groesse"]), ).fetchone() current = int(item["gezaehlt"] or 0) if item else 0 if current < menge: flash("Bestand reicht nicht aus, um die Bestellung abzuschließen.") return redirect(url_for("bp.orders")) db.execute( """ UPDATE items SET gezaehlt = CASE WHEN gezaehlt - ? < 0 THEN 0 ELSE gezaehlt - ? END, verkaeufe = verkaeufe + ?, updated_at = ? WHERE artikel = ? AND groesse = ? """, (menge, menge, menge, now_iso(), order["artikel"], order["groesse"]), ) db.execute( """ UPDATE orders SET done = 1, completed_by = ?, completed_at = ? WHERE id = ? """, (user, now_iso(), order_id), ) db.commit() return redirect(url_for("bp.orders")) @bp.route("/orders/cancel/", methods=["POST"]) @login_required def cancel_order(order_id: int): user = session.get("user") or "unknown" db = get_db() db.execute( """ UPDATE orders SET canceled = 1, canceled_by = ?, canceled_at = ? WHERE id = ? AND done = 0 """, (user, now_iso(), order_id), ) db.commit() return redirect(url_for("bp.orders")) @bp.route("/users/reset/", methods=["POST"]) @login_required def reset_user_password(user_id: int): db = get_db() user = db.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)).fetchone() if user is None: return redirect(url_for("bp.users")) new_password = secrets.token_urlsafe(8).replace("-", "").replace("_", "")[:12] db.execute( "UPDATE users SET password_hash = ? WHERE id = ?", (generate_password_hash(new_password), user_id), ) db.commit() flash(f"Neues Passwort für {user['username']}: {new_password}") return redirect(url_for("bp.users")) app.register_blueprint(bp, url_prefix=URL_PREFIX or "") if __name__ == "__main__": app.run(debug=True)