Files
Hellas-Wawi/wawi/app.py
Bjoern Welker fd3f49a2e1 feat: add automatic image optimization and thumbnails
Image Processing:
- Add Pillow dependency for image manipulation
- Auto-create optimized versions on upload
- Generate main image (max 800x800, 85% quality)
- Generate thumbnail (max 400x400, 80% quality)
- Delete original after optimization

Quality Improvements:
- Auto-correct EXIF orientation (photos from phones)
- Convert RGBA/transparency to RGB with white background
- Use LANCZOS resampling for high-quality downscaling
- Optimize JPEG compression

Performance:
- Smaller file sizes = faster page loads
- Thumbnails for product listings
- Optimized full-size for detail views
- Reduced storage usage

Fallback:
- Graceful degradation if Pillow not installed
- Error handling preserves original on failure
- Logging for monitoring optimization success

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-06 08:22:11 +01:00

904 lines
31 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
"""Hellas WaWi (Warenwirtschaft) Flask-App.
Kurzüberblick:
- SQLiteDatenbank für Artikel, Ausbuchungen, Users und Bestellungen.
- HTMLViews für Verwaltung sowie JSONAPIs für Bestand & Bestellungen.
- Optionaler Mailversand für eingehende Bestellungen.
"""
import os
import sqlite3
import secrets
import smtplib
import time
import threading
import logging
from uuid import uuid4
from email.message import EmailMessage
from functools import wraps
from pathlib import Path
from datetime import datetime
from typing import Any
try:
from PIL import Image
HAS_PIL = True
except ImportError:
HAS_PIL = False
logger.warning("Pillow nicht installiert - Bild-Optimierung deaktiviert")
from flask import Flask, Blueprint, g, flash, jsonify, redirect, render_template, request, session, url_for
from flask_wtf.csrf import CSRFProtect
from werkzeug.security import check_password_hash, generate_password_hash
from werkzeug.utils import secure_filename
BASE_DIR = Path(__file__).resolve().parent
DB_PATH = BASE_DIR / "hellas.db"
UPLOAD_DIR = BASE_DIR / "static" / "uploads"
ALLOWED_EXT = {".png", ".jpg", ".jpeg", ".webp", ".gif"}
# Logging konfigurieren
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Optionaler Prefix (z. B. /wawi), wenn die App hinter einem SubPfad läuft.
URL_PREFIX = os.environ.get("URL_PREFIX", "").strip().rstrip("/")
STATIC_URL_PATH = f"{URL_PREFIX}/static" if URL_PREFIX else "/static"
app = Flask(__name__, static_url_path=STATIC_URL_PATH)
# SessionSecret für LoginCookies (in Produktion unbedingt setzen).
SECRET_KEY = os.environ.get("SECRET_KEY", "change-me")
# Validierung: SECRET_KEY muss in Produktion gesetzt sein
if SECRET_KEY == "change-me":
import sys
if not app.debug and "pytest" not in sys.modules:
raise RuntimeError(
"SECURITY ERROR: SECRET_KEY ist nicht gesetzt!\n"
"Setze die Umgebungsvariable SECRET_KEY mit einem sicheren Wert.\n"
"Beispiel: export SECRET_KEY=$(python3 -c 'import secrets; print(secrets.token_urlsafe(32))')"
)
app.secret_key = SECRET_KEY
app.config["SESSION_COOKIE_SAMESITE"] = "Lax"
app.config["SESSION_COOKIE_SECURE"] = os.environ.get("COOKIE_SECURE", "1") == "1"
app.config["SESSION_COOKIE_HTTPONLY"] = True
app.config["MAX_CONTENT_LENGTH"] = 5 * 1024 * 1024
# CSRF-Schutz aktivieren
csrf = CSRFProtect(app)
bp = Blueprint("bp", __name__)
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
_DB_INIT_DONE = False
_RATE_LIMIT = {}
_RATE_WINDOW = 60
_RATE_MAX = 15
# --- Datenbank & Hilfsfunktionen ---
def get_db() -> sqlite3.Connection:
"""Verbindet zur SQLiteDB (pro Request gecached in Flaskg)."""
if "db" not in g:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
g.db = conn
return g.db
@app.teardown_appcontext
def close_db(exc: Exception | None) -> None:
"""Schließt die DBVerbindung am Ende des Requests."""
db = g.pop("db", None)
if db is not None:
db.close()
def init_db() -> None:
"""Initialisiert Tabellen und führt kleine SchemaMigrationen aus.
Tabellen:
- items: Artikelstamm + Soll/Bestand/Verkäufe
- ausbuchungen: Historie von Abgängen
- users: LoginBenutzer
- orders: eingehende Bestellungen (offen/abgeschlossen/storniert)
"""
db = get_db()
db.executescript(
"""
CREATE TABLE IF NOT EXISTS items (
id INTEGER PRIMARY KEY AUTOINCREMENT,
artikel TEXT NOT NULL, -- Artikelbezeichnung
groesse TEXT NOT NULL, -- Variante/Größe
preis REAL NOT NULL DEFAULT 0,-- Verkaufspreis
bild_url TEXT, -- Optionales Produktbild
soll INTEGER NOT NULL DEFAULT 0, -- SollBestand
gezaehlt INTEGER NOT NULL DEFAULT 0, -- IstBestand
verkaeufe INTEGER NOT NULL DEFAULT 0, -- Verkäufe gesamt
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS ausbuchungen (
id INTEGER PRIMARY KEY AUTOINCREMENT,
item_id INTEGER NOT NULL, -- Referenz auf items.id
menge INTEGER NOT NULL, -- Abgangsmenge
grund TEXT, -- z. B. Verkauf, Defekt, etc.
created_at TEXT NOT NULL,
FOREIGN KEY(item_id) REFERENCES items(id)
);
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL, -- Besteller
handy TEXT NOT NULL, -- Kontakt
mannschaft TEXT NOT NULL, -- Team/Abteilung
artikel TEXT NOT NULL,
groesse TEXT NOT NULL,
menge INTEGER NOT NULL,
notiz TEXT,
created_at TEXT NOT NULL,
done INTEGER NOT NULL DEFAULT 0, -- abgeschlossen?
completed_by TEXT, -- Username
completed_at TEXT, -- Timestamp
canceled INTEGER NOT NULL DEFAULT 0,-- storniert?
canceled_by TEXT, -- Username
canceled_at TEXT -- Timestamp
);
"""
)
db.commit()
ensure_price_column(db)
ensure_image_column(db)
ensure_orders_columns(db)
ensure_indexes(db)
ensure_admin_user(db)
def ensure_price_column(db: sqlite3.Connection) -> None:
"""Fügt die Preisspalte nachträglich hinzu, falls DB älter ist."""
cols = db.execute("PRAGMA table_info(items)").fetchall()
if any(c["name"] == "preis" for c in cols):
return
db.execute("ALTER TABLE items ADD COLUMN preis REAL NOT NULL DEFAULT 0")
db.commit()
def ensure_image_column(db: sqlite3.Connection) -> None:
"""Fügt bild_url nachträglich hinzu, falls DB älter ist."""
cols = db.execute("PRAGMA table_info(items)").fetchall()
if any(c["name"] == "bild_url" for c in cols):
return
db.execute("ALTER TABLE items ADD COLUMN bild_url TEXT")
db.commit()
def ensure_orders_columns(db: sqlite3.Connection) -> None:
"""Sorgt für alle nachträglich eingeführten OrdersSpalten."""
cols = db.execute("PRAGMA table_info(orders)").fetchall()
names = {c["name"] for c in cols}
if "done" not in names:
db.execute("ALTER TABLE orders ADD COLUMN done INTEGER NOT NULL DEFAULT 0")
if "completed_by" not in names:
db.execute("ALTER TABLE orders ADD COLUMN completed_by TEXT")
if "completed_at" not in names:
db.execute("ALTER TABLE orders ADD COLUMN completed_at TEXT")
if "canceled" not in names:
db.execute("ALTER TABLE orders ADD COLUMN canceled INTEGER NOT NULL DEFAULT 0")
if "canceled_by" not in names:
db.execute("ALTER TABLE orders ADD COLUMN canceled_by TEXT")
if "canceled_at" not in names:
db.execute("ALTER TABLE orders ADD COLUMN canceled_at TEXT")
db.commit()
def ensure_indexes(db: sqlite3.Connection) -> None:
"""Erstellt Indizes für bessere Query-Performance (idempotent)."""
# Index für items.artikel (häufig gesucht/gefiltert)
db.execute("CREATE INDEX IF NOT EXISTS idx_items_artikel ON items(artikel)")
# Index für items(artikel, groesse) für eindeutige Zuordnung
db.execute("CREATE INDEX IF NOT EXISTS idx_items_artikel_groesse ON items(artikel, groesse)")
# Index für orders.done und orders.canceled (Filter "offene Bestellungen")
db.execute("CREATE INDEX IF NOT EXISTS idx_orders_done ON orders(done)")
db.execute("CREATE INDEX IF NOT EXISTS idx_orders_canceled ON orders(canceled)")
db.execute("CREATE INDEX IF NOT EXISTS idx_orders_status ON orders(done, canceled)")
# Index für ausbuchungen.item_id (Foreign Key, JOINs)
db.execute("CREATE INDEX IF NOT EXISTS idx_ausbuchungen_item_id ON ausbuchungen(item_id)")
db.commit()
logger.info("Datenbank-Indizes überprüft/erstellt")
@app.before_request
def ensure_db() -> None:
"""Stellt sicher, dass DB/Schema einmal pro Worker initialisiert ist."""
global _DB_INIT_DONE
if not _DB_INIT_DONE:
init_db()
_DB_INIT_DONE = True
def now_iso() -> str:
"""Timestamp als kompaktes, DBfreundliches Format."""
return datetime.now().strftime("%Y-%m-%d %H:%M")
def save_upload(file) -> str | None:
"""Speichert einen BildUpload mit automatischer Optimierung und Thumbnail-Erstellung."""
if not file or not getattr(file, "filename", ""):
return None
ext = os.path.splitext(file.filename)[1].lower()
if ext not in ALLOWED_EXT:
return None
if not (file.mimetype or "").startswith("image/"):
return None
name = secure_filename(Path(file.filename).stem) or "image"
unique_id = uuid4().hex
# Original speichern
original_name = f"{name}-{unique_id}{ext}"
original_path = UPLOAD_DIR / original_name
file.save(original_path)
# Bild optimieren (falls Pillow verfügbar)
if HAS_PIL:
try:
with Image.open(original_path) as img:
# EXIF-Orientierung korrigieren
try:
from PIL import ImageOps
img = ImageOps.exif_transpose(img)
except Exception:
pass
# Konvertiere RGBA zu RGB (für JPEG)
if img.mode in ('RGBA', 'LA', 'P'):
background = Image.new('RGB', img.size, (255, 255, 255))
if img.mode == 'P':
img = img.convert('RGBA')
background.paste(img, mask=img.split()[-1] if img.mode in ('RGBA', 'LA') else None)
img = background
# Optimiertes Hauptbild (max 800x800)
optimized_name = f"{name}-{unique_id}-opt.jpg"
optimized_path = UPLOAD_DIR / optimized_name
img_copy = img.copy()
img_copy.thumbnail((800, 800), Image.Resampling.LANCZOS)
img_copy.save(optimized_path, 'JPEG', quality=85, optimize=True)
# Thumbnail (max 400x400)
thumb_name = f"{name}-{unique_id}-thumb.jpg"
thumb_path = UPLOAD_DIR / thumb_name
img.thumbnail((400, 400), Image.Resampling.LANCZOS)
img.save(thumb_path, 'JPEG', quality=80, optimize=True)
# Original löschen (verwenden nur optimierte Versionen)
original_path.unlink()
logger.info(f"Bild optimiert: {optimized_name} & {thumb_name}")
# Thumbnail-URL zurückgeben (wird als Standard verwendet)
return f"{URL_PREFIX}/static/uploads/{thumb_name}" if URL_PREFIX else f"/static/uploads/{thumb_name}"
except Exception as e:
logger.error(f"Fehler bei Bild-Optimierung: {e}")
# Fallback: Original verwenden
# Fallback wenn Pillow fehlt oder Fehler aufgetreten ist
return f"{URL_PREFIX}/static/uploads/{original_name}" if URL_PREFIX else f"/static/uploads/{original_name}"
def ensure_admin_user(db: sqlite3.Connection) -> None:
"""Legt einen AdminUser an, wenn noch kein Benutzer existiert."""
row = db.execute("SELECT COUNT(*) AS c FROM users").fetchone()
if row and row["c"] > 0:
return
user = os.environ.get("APP_USER", "admin")
password = os.environ.get("APP_PASSWORD", "admin")
db.execute(
"""
INSERT INTO users (username, password_hash, created_at)
VALUES (?, ?, ?)
""",
(user, generate_password_hash(password), now_iso()),
)
db.commit()
def login_required(fn):
"""Schützt HTMLViews mit LoginSession."""
@wraps(fn)
def wrapper(*args, **kwargs):
if not session.get("user"):
return redirect(url_for("bp.login", next=request.path))
return fn(*args, **kwargs)
return wrapper
@app.context_processor
def inject_auth():
"""Stellt logged_in im TemplateKontext bereit."""
return {"logged_in": bool(session.get("user"))}
def api_key_required(fn):
"""Schützt APIEndpoints per X-API-Key oder ?key= Parameter."""
@wraps(fn)
def wrapper(*args, **kwargs):
expected = os.environ.get("APP_API_KEY", "")
if not expected:
return jsonify({"error": "API key not configured"}), 500
provided = request.headers.get("X-API-Key") or request.args.get("key") or ""
if provided != expected:
return jsonify({"error": "Unauthorized"}), 401
return fn(*args, **kwargs)
return wrapper
def rate_limited(ip: str) -> bool:
"""Einfaches InMemory RateLimit pro IP (1MinFenster)."""
now = time.time()
bucket = _RATE_LIMIT.setdefault(ip, [])
bucket[:] = [t for t in bucket if now - t < _RATE_WINDOW]
if len(bucket) >= _RATE_MAX:
return True
bucket.append(now)
return False
# --- HTMLViews (Admin) ---
@bp.route("/")
@login_required
def index():
"""Startseite: Artikelübersicht mit Filter, Sortierung und Summen."""
q = (request.args.get("q") or "").strip()
sort = (request.args.get("sort") or "gezaehlt").strip().lower()
direction = (request.args.get("dir") or "desc").strip().lower()
allowed = {"artikel", "groesse", "soll", "gezaehlt", "verkaeufe"}
if sort not in allowed:
sort = "gezaehlt"
if direction not in {"asc", "desc"}:
direction = "desc"
params: list[Any] = []
where = ""
if q:
where = "WHERE artikel LIKE ? OR groesse LIKE ?"
like = f"%{q}%"
params.extend([like, like])
sql = f"""
SELECT *
FROM items
{where}
ORDER BY {sort} {direction}, artikel ASC, groesse ASC
"""
rows = get_db().execute(sql, params).fetchall()
grouped = {}
for r in rows:
key = r["artikel"] or ""
grouped.setdefault(key, []).append(r)
groups = [{"artikel": k, "rows": v} for k, v in grouped.items()]
total = get_db().execute("SELECT COUNT(*) AS c FROM items").fetchone()["c"]
total_bestand = get_db().execute("SELECT COALESCE(SUM(gezaehlt), 0) AS s FROM items").fetchone()["s"]
open_orders = get_db().execute("SELECT COUNT(*) AS c FROM orders WHERE done = 0 AND canceled = 0").fetchone()["c"]
return render_template(
"index.html",
groups=groups,
q=q,
sort=sort,
direction=direction,
total=total,
total_bestand=total_bestand,
open_orders=open_orders,
)
@bp.route("/new", methods=["GET", "POST"])
@login_required
def new_item():
"""Artikel anlegen (inkl. optionalem BildUpload)."""
if request.method == "POST":
artikel = (request.form.get("artikel") or "").strip()
groesse = (request.form.get("groesse") or "").strip()
preis = float(request.form.get("preis") or 0)
bild_url = (request.form.get("bild_url") or "").strip()
uploaded = save_upload(request.files.get("bild_file"))
if uploaded:
bild_url = uploaded
soll = int(request.form.get("soll") or 0)
gezaehlt = int(request.form.get("gezaehlt") or 0)
verkaeufe = int(request.form.get("verkaeufe") or 0)
if artikel and groesse:
db = get_db()
db.execute(
"""
INSERT INTO items (artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, now_iso(), now_iso()),
)
db.execute("UPDATE items SET preis = ?, bild_url = ? WHERE artikel = ?", (preis, bild_url, artikel))
db.commit()
return redirect(url_for("bp.index"))
return render_template("edit.html", item=None)
@bp.route("/edit/<int:item_id>", methods=["GET", "POST"])
@login_required
def edit_item(item_id: int):
"""Artikel bearbeiten."""
db = get_db()
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if item is None:
return redirect(url_for("bp.index"))
if request.method == "POST":
artikel = (request.form.get("artikel") or "").strip()
groesse = (request.form.get("groesse") or "").strip()
preis = float(request.form.get("preis") or 0)
bild_url = (request.form.get("bild_url") or "").strip()
uploaded = save_upload(request.files.get("bild_file"))
if uploaded:
bild_url = uploaded
soll = int(request.form.get("soll") or 0)
gezaehlt = int(request.form.get("gezaehlt") or 0)
verkaeufe = int(request.form.get("verkaeufe") or 0)
db.execute(
"""
UPDATE items
SET artikel = ?, groesse = ?, preis = ?, bild_url = ?, soll = ?, gezaehlt = ?, verkaeufe = ?, updated_at = ?
WHERE id = ?
""",
(artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe, now_iso(), item_id),
)
db.execute("UPDATE items SET preis = ?, bild_url = ? WHERE artikel = ?", (preis, bild_url, artikel))
db.commit()
return redirect(url_for("bp.index"))
return render_template("edit.html", item=item)
@bp.route("/delete/<int:item_id>", methods=["POST"])
@login_required
def delete_item(item_id: int):
"""Artikel löschen."""
db = get_db()
db.execute("DELETE FROM items WHERE id = ?", (item_id,))
db.commit()
return redirect(url_for("bp.index"))
@bp.route("/ausbuchen/<int:item_id>", methods=["GET", "POST"])
@login_required
def ausbuchen(item_id: int):
"""Ausbuchung erfassen (reduziert Bestand & erhöht Verkäufe)."""
db = get_db()
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if item is None:
return redirect(url_for("bp.index"))
if request.method == "POST":
menge = int(request.form.get("menge") or 0)
grund = (request.form.get("grund") or "").strip() or None
if menge > 0:
neue_gezaehlt = max(int(item["gezaehlt"]) - menge, 0)
neue_verkaeufe = int(item["verkaeufe"]) + menge
db.execute(
"""
UPDATE items
SET gezaehlt = ?, verkaeufe = ?, updated_at = ?
WHERE id = ?
""",
(neue_gezaehlt, neue_verkaeufe, now_iso(), item_id),
)
db.execute(
"""
INSERT INTO ausbuchungen (item_id, menge, grund, created_at)
VALUES (?, ?, ?, ?)
""",
(item_id, menge, grund, now_iso()),
)
db.commit()
return redirect(url_for("bp.index"))
return render_template("ausbuchen.html", item=item)
@bp.route("/verkauf/<int:item_id>", methods=["POST"])
@login_required
def verkauf(item_id: int):
"""SchnellVerkauf: 1 Stück buchen und als Ausbuchung protokollieren."""
db = get_db()
item = db.execute("SELECT * FROM items WHERE id = ?", (item_id,)).fetchone()
if item is None:
return redirect(url_for("bp.index"))
menge = 1
neue_gezaehlt = max(int(item["gezaehlt"]) - menge, 0)
neue_verkaeufe = int(item["verkaeufe"]) + menge
db.execute(
"""
UPDATE items
SET gezaehlt = ?, verkaeufe = ?, updated_at = ?
WHERE id = ?
""",
(neue_gezaehlt, neue_verkaeufe, now_iso(), item_id),
)
db.execute(
"""
INSERT INTO ausbuchungen (item_id, menge, grund, created_at)
VALUES (?, ?, ?, ?)
""",
(item_id, menge, "Verkauf", now_iso()),
)
db.commit()
return redirect(url_for("bp.index"))
# --- Auth ---
@bp.route("/login", methods=["GET", "POST"])
def login():
"""LoginFormular & SessionHandling."""
if request.method == "POST":
user = (request.form.get("user") or "").strip()
password = request.form.get("password") or ""
row = get_db().execute(
"SELECT id, username, password_hash FROM users WHERE username = ?",
(user,),
).fetchone()
if row and check_password_hash(row["password_hash"], password):
session["user"] = user
logger.info(f"Erfolgreicher Login: {user}")
nxt = request.args.get("next") or url_for("bp.index")
return redirect(nxt)
logger.warning(f"Fehlgeschlagener Login-Versuch: {user}")
return render_template("login.html", error=True)
return render_template("login.html", error=False)
@bp.route("/logout")
def logout():
"""Session beenden."""
session.clear()
return redirect(url_for("bp.login"))
# --- Benutzerverwaltung ---
@bp.route("/users", methods=["GET", "POST"])
@login_required
def users():
"""Benutzerverwaltung (anlegen, anzeigen)."""
db = get_db()
error = None
if request.method == "POST":
username = (request.form.get("username") or "").strip()
password = request.form.get("password") or ""
if not username or not password:
error = "Benutzer und Passwort sind erforderlich."
else:
try:
db.execute(
"""
INSERT INTO users (username, password_hash, created_at)
VALUES (?, ?, ?)
""",
(username, generate_password_hash(password), now_iso()),
)
db.commit()
except sqlite3.IntegrityError:
error = "Benutzername existiert bereits."
rows = db.execute("SELECT id, username, created_at FROM users ORDER BY username").fetchall()
return render_template("users.html", rows=rows, error=error)
@bp.route("/users/delete/<int:user_id>", methods=["POST"])
@login_required
def delete_user(user_id: int):
"""Benutzer löschen (mindestens ein User muss bleiben)."""
db = get_db()
count = db.execute("SELECT COUNT(*) AS c FROM users").fetchone()["c"]
if count <= 1:
return redirect(url_for("bp.users"))
db.execute("DELETE FROM users WHERE id = ?", (user_id,))
db.commit()
return redirect(url_for("bp.users"))
# --- JSONAPIs ---
@bp.route("/api/bestand", methods=["GET"])
@api_key_required
def api_bestand():
"""Öffentliche JSONAPI (authentifiziert) für Bestände."""
return jsonify(build_bestand())
@bp.route("/proxy/bestand", methods=["GET"])
def proxy_bestand():
"""ServerProxy ohne APIKey (z. B. für öffentliche Anzeige)."""
return jsonify(build_bestand())
def build_bestand() -> list[dict]:
"""Aggregiert DBZeilen in die Struktur der LiveBestand Ansicht."""
rows = get_db().execute(
"""
SELECT artikel, groesse, preis, bild_url, soll, gezaehlt, verkaeufe
FROM items
ORDER BY artikel, groesse
"""
).fetchall()
data: dict[str, dict] = {}
for r in rows:
artikel = (r["artikel"] or "").strip()
if not artikel:
continue
item = data.setdefault(
artikel,
{"artikel": artikel, "preis": 0, "bild_url": "", "rows": [], "totals": {"soll": 0, "gezaehlt": 0, "abweichung": 0, "fehlbestand": 0, "verkaeufe": 0}},
)
if not item["preis"]:
item["preis"] = float(r["preis"] or 0)
if not item["bild_url"]:
item["bild_url"] = (r["bild_url"] or "").strip()
soll = int(r["soll"] or 0)
gezaehlt = int(r["gezaehlt"] or 0)
verkaeufe = int(r["verkaeufe"] or 0)
abw = gezaehlt - soll
fehl = max(soll - gezaehlt, 0)
item["rows"].append(
{
"groesse": r["groesse"],
"soll": soll,
"gezaehlt": gezaehlt,
"abweichung": abw,
"fehlbestand": fehl if fehl > 0 else None,
"verkaeufe": verkaeufe,
}
)
t = item["totals"]
t["soll"] += soll
t["gezaehlt"] += gezaehlt
t["abweichung"] += abw
t["fehlbestand"] += fehl
t["verkaeufe"] += verkaeufe
result: list[dict] = []
for artikel, item in data.items():
t = item["totals"]
if t["fehlbestand"] == 0:
t["fehlbestand"] = None
result.append(item)
return result
@bp.route("/order", methods=["POST"])
@csrf.exempt # JSON API ohne CSRF-Schutz (nutzt API-Key stattdessen)
def order():
"""Erstellt eine Bestellung (optional APIKey) und versendet Mail."""
ip = request.headers.get("X-Forwarded-For", request.remote_addr or "unknown").split(",")[0].strip()
if rate_limited(ip):
return jsonify({"error": "Zu viele Anfragen."}), 429
expected_key = os.environ.get("APP_API_KEY", "")
if expected_key:
provided = request.headers.get("X-Order-Key") or request.args.get("key") or ""
if provided != expected_key:
return jsonify({"error": "Unauthorized"}), 401
data = request.get_json(silent=True) or request.form
required = ["name", "handy", "mannschaft", "artikel", "groesse", "menge"]
if any(not (data.get(k) or "").strip() for k in required):
return jsonify({"error": "Pflichtfelder fehlen."}), 400
if int(data.get("menge") or 0) <= 0:
return jsonify({"error": "Menge muss größer als 0 sein."}), 400
db = get_db()
db.execute(
"""
INSERT INTO orders (name, handy, mannschaft, artikel, groesse, menge, notiz, created_at, done, completed_by, completed_at, canceled, canceled_by, canceled_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 0, NULL, NULL, 0, NULL, NULL)
""",
(
data.get("name"),
data.get("handy"),
data.get("mannschaft"),
data.get("artikel"),
data.get("groesse"),
int(data.get("menge") or 0),
data.get("notiz"),
now_iso(),
),
)
db.commit()
logger.info(f"Neue Bestellung: {data.get('artikel')} ({data.get('groesse')}) x{data.get('menge')} von {data.get('name')}")
to_addr = os.environ.get("ORDER_TO", "bjoern@welker.me")
smtp_host = os.environ.get("SMTP_HOST")
smtp_user = os.environ.get("SMTP_USER")
smtp_pass = os.environ.get("SMTP_PASS")
smtp_port = int(os.environ.get("SMTP_PORT", "587"))
smtp_from = os.environ.get("SMTP_FROM", smtp_user or "no-reply@localhost")
if not smtp_host or not smtp_user or not smtp_pass:
return jsonify({"error": "Mailversand nicht konfiguriert."}), 500
msg = EmailMessage()
msg["Subject"] = "Neue Bestellung (Hellas Bestand)"
msg["From"] = smtp_from
recipients = [a.strip() for a in to_addr.split(",") if a.strip()]
msg["To"] = ", ".join(recipients)
body = (
"Neue Bestellung:\n"
f"Name: {data.get('name')}\n"
f"Handy: {data.get('handy')}\n"
f"Mannschaft: {data.get('mannschaft')}\n"
f"Artikel: {data.get('artikel')}\n"
f"Größe: {data.get('groesse')}\n"
f"Menge: {data.get('menge')}\n"
f"Notiz: {data.get('notiz') or '-'}\n"
"WaWi: https://hellas.welker.me/wawi\n"
)
msg.set_content(body)
def _send():
"""Sendet Bestellungs-Email asynchron mit Error Handling."""
try:
with smtplib.SMTP(smtp_host, smtp_port) as server:
server.starttls()
server.login(smtp_user, smtp_pass)
server.send_message(msg, to_addrs=recipients)
logger.info(f"Bestellungs-Email erfolgreich versendet an {', '.join(recipients)}")
except smtplib.SMTPAuthenticationError as e:
logger.error(f"SMTP-Authentifizierung fehlgeschlagen: {e}")
except smtplib.SMTPException as e:
logger.error(f"SMTP-Fehler beim Email-Versand: {e}")
except Exception as e:
logger.error(f"Unerwarteter Fehler beim Email-Versand: {e}", exc_info=True)
threading.Thread(target=_send, daemon=True).start()
return jsonify({"ok": True})
# --- Bestellungen (Admin) ---
@bp.route("/orders")
@login_required
def orders():
"""Bestellliste in der Verwaltung."""
rows = get_db().execute(
"""
SELECT id, name, handy, mannschaft, artikel, groesse, menge, notiz, created_at, done, completed_by, completed_at, canceled, canceled_by, canceled_at
FROM orders
ORDER BY id DESC
LIMIT 500
"""
).fetchall()
return render_template("orders.html", rows=rows)
@bp.route("/orders/complete/<int:order_id>", methods=["POST"])
@login_required
def complete_order(order_id: int):
"""Bestellung abschließen und Bestand abziehen (falls ausreichend)."""
user = session.get("user") or "unknown"
db = get_db()
order = db.execute(
"""
SELECT id, artikel, groesse, menge, done, canceled
FROM orders
WHERE id = ?
""",
(order_id,),
).fetchone()
if order is None:
return redirect(url_for("bp.orders"))
if not order["done"] and not order["canceled"]:
menge = int(order["menge"] or 0)
item = db.execute(
"""
SELECT gezaehlt
FROM items
WHERE artikel = ? AND groesse = ?
""",
(order["artikel"], order["groesse"]),
).fetchone()
current = int(item["gezaehlt"] or 0) if item else 0
if current < menge:
flash("Bestand reicht nicht aus, um die Bestellung abzuschließen.")
return redirect(url_for("bp.orders"))
db.execute(
"""
UPDATE items
SET gezaehlt = CASE WHEN gezaehlt - ? < 0 THEN 0 ELSE gezaehlt - ? END,
verkaeufe = verkaeufe + ?,
updated_at = ?
WHERE artikel = ? AND groesse = ?
""",
(menge, menge, menge, now_iso(), order["artikel"], order["groesse"]),
)
db.execute(
"""
UPDATE orders
SET done = 1, completed_by = ?, completed_at = ?
WHERE id = ?
""",
(user, now_iso(), order_id),
)
db.commit()
logger.info(f"Bestellung #{order_id} abgeschlossen von {user}")
return redirect(url_for("bp.orders"))
@bp.route("/orders/cancel/<int:order_id>", methods=["POST"])
@login_required
def cancel_order(order_id: int):
"""Bestellung stornieren (nur wenn noch offen)."""
user = session.get("user") or "unknown"
db = get_db()
db.execute(
"""
UPDATE orders
SET canceled = 1, canceled_by = ?, canceled_at = ?
WHERE id = ? AND done = 0
""",
(user, now_iso(), order_id),
)
db.commit()
return redirect(url_for("bp.orders"))
@bp.route("/users/reset/<int:user_id>", methods=["POST"])
@login_required
def reset_user_password(user_id: int):
"""Passwort zurücksetzen und neues Passwort als Flash anzeigen."""
db = get_db()
user = db.execute("SELECT id, username FROM users WHERE id = ?", (user_id,)).fetchone()
if user is None:
return redirect(url_for("bp.users"))
new_password = secrets.token_urlsafe(8).replace("-", "").replace("_", "")[:12]
db.execute(
"UPDATE users SET password_hash = ? WHERE id = ?",
(generate_password_hash(new_password), user_id),
)
db.commit()
flash(f"Neues Passwort für {user['username']}: {new_password}")
return redirect(url_for("bp.users"))
app.register_blueprint(bp, url_prefix=URL_PREFIX or "")
if __name__ == "__main__":
app.run(debug=True)