import asyncio
import json
import logging
import os
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from urllib.parse import urlencode

import httpx
import websockets
from dotenv import load_dotenv
from fastapi import FastAPI, Query, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse

load_dotenv()

LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=LOG_LEVEL, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger("twitch-events")

TWITCH_CLIENT_ID = os.getenv("TWITCH_CLIENT_ID", "")
TWITCH_CLIENT_SECRET = os.getenv("TWITCH_CLIENT_SECRET", "")
TWITCH_REDIRECT_URI = os.getenv("TWITCH_REDIRECT_URI", "https://misterdietz.de/twitch-events/auth/callback")
PUBLIC_BASE_URL = os.getenv("PUBLIC_BASE_URL", "https://misterdietz.de")
TOKEN_STORE_PATH = Path(os.getenv("TOKEN_STORE_PATH", "/var/www/misterdietz.de/twitch-events/tokens.json"))

TWITCH_AUTH_URL = "https://id.twitch.tv/oauth2/authorize"
TWITCH_TOKEN_URL = "https://id.twitch.tv/oauth2/token"
TWITCH_VALIDATE_URL = "https://id.twitch.tv/oauth2/validate"
TWITCH_USERS_URL = "https://api.twitch.tv/helix/users"
TWITCH_EVENTSUB_SUBSCRIPTIONS_URL = "https://api.twitch.tv/helix/eventsub/subscriptions"
TWITCH_EVENTSUB_WS_URL = "wss://eventsub.wss.twitch.tv/ws"

HYPE_SCOPE = "channel:read:hype_train"
HYPE_TYPES = [
    "channel.hype_train.begin",
    "channel.hype_train.progress",
    "channel.hype_train.end",
]

app = FastAPI(title="Twitch EventSub Hype Train Bridge")

clients: Set[WebSocket] = set()
client_filters: Dict[WebSocket, Set[str]] = {}
eventsub_task: Optional[asyncio.Task] = None
eventsub_session_id: Optional[str] = None
current_subscriptions: Set[str] = set()


def ensure_config() -> None:
    if not TWITCH_CLIENT_ID or not TWITCH_CLIENT_SECRET:
        raise RuntimeError("TWITCH_CLIENT_ID and TWITCH_CLIENT_SECRET must be configured.")


def load_tokens() -> Dict[str, Any]:
    if not TOKEN_STORE_PATH.exists():
        return {}
    try:
        return json.loads(TOKEN_STORE_PATH.read_text(encoding="utf-8"))
    except Exception:
        log.exception("Could not read token store")
        return {}


def save_tokens(data: Dict[str, Any]) -> None:
    TOKEN_STORE_PATH.parent.mkdir(parents=True, exist_ok=True)
    tmp = TOKEN_STORE_PATH.with_suffix(".tmp")
    tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
    tmp.replace(TOKEN_STORE_PATH)
    try:
        os.chmod(TOKEN_STORE_PATH, 0o600)
    except Exception:
        pass


def normalize_channel(value: str) -> str:
    return "".join(ch for ch in (value or "").lower().strip().lstrip("@#") if ch.isalnum() or ch == "_")


async def twitch_headers(access_token: str) -> Dict[str, str]:
    return {
        "Client-ID": TWITCH_CLIENT_ID,
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }


async def exchange_code_for_token(code: str) -> Dict[str, Any]:
    ensure_config()
    async with httpx.AsyncClient(timeout=15) as client:
        response = await client.post(TWITCH_TOKEN_URL, data={
            "client_id": TWITCH_CLIENT_ID,
            "client_secret": TWITCH_CLIENT_SECRET,
            "code": code,
            "grant_type": "authorization_code",
            "redirect_uri": TWITCH_REDIRECT_URI,
        })
        response.raise_for_status()
        return response.json()


async def refresh_token(refresh_token_value: str) -> Dict[str, Any]:
    ensure_config()
    async with httpx.AsyncClient(timeout=15) as client:
        response = await client.post(TWITCH_TOKEN_URL, data={
            "client_id": TWITCH_CLIENT_ID,
            "client_secret": TWITCH_CLIENT_SECRET,
            "grant_type": "refresh_token",
            "refresh_token": refresh_token_value,
        })
        response.raise_for_status()
        return response.json()


async def validate_token(access_token: str) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=15) as client:
        response = await client.get(TWITCH_VALIDATE_URL, headers={"Authorization": f"OAuth {access_token}"})
        response.raise_for_status()
        return response.json()


async def get_user_by_token(access_token: str) -> Dict[str, Any]:
    async with httpx.AsyncClient(timeout=15) as client:
        response = await client.get(TWITCH_USERS_URL, headers=await twitch_headers(access_token))
        response.raise_for_status()
        data = response.json()
        users = data.get("data", [])
        if not users:
            raise RuntimeError("No Twitch user returned.")
        return users[0]


async def get_valid_token(channel_login: str) -> Optional[Dict[str, Any]]:
    tokens = load_tokens()
    entry = tokens.get(channel_login)
    if not entry:
        return None

    access_token = entry.get("access_token")
    refresh_token_value = entry.get("refresh_token")

    if access_token:
        try:
            await validate_token(access_token)
            return entry
        except Exception:
            log.info("Token for %s needs refresh", channel_login)

    if not refresh_token_value:
        return None

    try:
        refreshed = await refresh_token(refresh_token_value)
        entry.update({
            "access_token": refreshed["access_token"],
            "refresh_token": refreshed.get("refresh_token", refresh_token_value),
            "scope": refreshed.get("scope", entry.get("scope", [])),
            "updated_at": int(time.time()),
        })
        tokens[channel_login] = entry
        save_tokens(tokens)
        return entry
    except Exception:
        log.exception("Could not refresh token for %s", channel_login)
        return None


async def create_hype_subscription(subscription_type: str, broadcaster_id: str, access_token: str, session_id: str) -> None:
    body = {
        "type": subscription_type,
        "version": "2",
        "condition": {"broadcaster_user_id": broadcaster_id},
        "transport": {"method": "websocket", "session_id": session_id},
    }
    async with httpx.AsyncClient(timeout=20) as client:
        response = await client.post(
            TWITCH_EVENTSUB_SUBSCRIPTIONS_URL,
            headers=await twitch_headers(access_token),
            json=body,
        )
        if response.status_code in (202, 409):
            log.info("Subscription %s for %s OK: %s", subscription_type, broadcaster_id, response.status_code)
            return
        log.warning("Subscription failed: %s %s", response.status_code, response.text)


async def subscribe_authorized_channels(session_id: str) -> None:
    global current_subscriptions
    tokens = load_tokens()
    for login, entry in list(tokens.items()):
        clean_login = normalize_channel(login)
        token_entry = await get_valid_token(clean_login)
        if not token_entry:
            continue

        broadcaster_id = str(token_entry.get("user_id") or "")
        access_token = token_entry.get("access_token")
        if not broadcaster_id or not access_token:
            continue

        for sub_type in HYPE_TYPES:
            key = f"{sub_type}:{broadcaster_id}:{session_id}"
            if key in current_subscriptions:
                continue
            await create_hype_subscription(sub_type, broadcaster_id, access_token, session_id)
            current_subscriptions.add(key)


def map_hype_event(message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
    payload = message.get("payload", {})
    subscription = payload.get("subscription", {})
    event = payload.get("event", {})
    sub_type = subscription.get("type", "")

    if not sub_type.startswith("channel.hype_train."):
        return None

    event_type = sub_type.rsplit(".", 1)[-1]
    channel = normalize_channel(event.get("broadcaster_user_login") or event.get("broadcaster_user_name") or "")

    # Twitch field names may evolve; keep the mapping tolerant.
    progress = event.get("progress") or event.get("total") or 0
    goal = event.get("goal") or event.get("target") or 0

    return {
        "kind": "hype_train",
        "event": {
            "type": event_type,
            "channel": channel,
            "broadcaster_user_id": event.get("broadcaster_user_id"),
            "broadcaster_user_login": event.get("broadcaster_user_login"),
            "broadcaster_user_name": event.get("broadcaster_user_name"),
            "level": event.get("level", 1),
            "progress": progress,
            "goal": goal,
            "started_at": event.get("started_at"),
            "expires_at": event.get("expires_at"),
            "ended_at": event.get("ended_at"),
            "cooldown_ends_at": event.get("cooldown_ends_at"),
            "is_golden_kappa_train": event.get("is_golden_kappa_train", False),
            "top_contributions": event.get("top_contributions", []),
            "last_contribution": event.get("last_contribution"),
        },
    }


async def broadcast_event(data: Dict[str, Any]) -> None:
    event = data.get("event", {})
    channel = normalize_channel(event.get("channel", ""))
    if not clients:
        return

    payload = json.dumps(data, ensure_ascii=False)
    dead: List[WebSocket] = []
    for ws in list(clients):
        watched = client_filters.get(ws, set())
        if watched and channel and channel not in watched:
            continue
        try:
            await ws.send_text(payload)
        except Exception:
            dead.append(ws)

    for ws in dead:
        clients.discard(ws)
        client_filters.pop(ws, None)


async def eventsub_loop() -> None:
    global eventsub_session_id, current_subscriptions
    while True:
        try:
            ensure_config()
            log.info("Connecting to Twitch EventSub WebSocket")
            async with websockets.connect(TWITCH_EVENTSUB_WS_URL, ping_interval=30, ping_timeout=10) as ws:
                async for raw in ws:
                    message = json.loads(raw)
                    msg_type = message.get("metadata", {}).get("message_type")

                    if msg_type == "session_welcome":
                        eventsub_session_id = message["payload"]["session"]["id"]
                        current_subscriptions = set()
                        log.info("EventSub session welcome: %s", eventsub_session_id)
                        await subscribe_authorized_channels(eventsub_session_id)

                    elif msg_type == "session_reconnect":
                        reconnect_url = message.get("payload", {}).get("session", {}).get("reconnect_url")
                        log.info("EventSub requested reconnect: %s", reconnect_url)
                        break

                    elif msg_type == "notification":
                        mapped = map_hype_event(message)
                        if mapped:
                            await broadcast_event(mapped)

                    elif msg_type == "revocation":
                        log.warning("EventSub subscription revoked: %s", message)

        except asyncio.CancelledError:
            raise
        except Exception:
            log.exception("EventSub loop crashed; reconnecting soon")

        await asyncio.sleep(5)


@app.on_event("startup")
async def startup() -> None:
    global eventsub_task
    eventsub_task = asyncio.create_task(eventsub_loop())


@app.on_event("shutdown")
async def shutdown() -> None:
    if eventsub_task:
        eventsub_task.cancel()


@app.get("/")
async def index() -> HTMLResponse:
    tokens = load_tokens()
    rows = []
    for login, entry in sorted(tokens.items()):
        rows.append(f"<li><b>#{login}</b> — Twitch ID {entry.get('user_id', '?')}</li>")
    auth_url = "/twitch-events/auth/login"
    html = f"""
    <!doctype html>
    <html lang="de">
    <head>
      <meta charset="utf-8">
      <meta name="viewport" content="width=device-width, initial-scale=1">
      <title>Twitch EventSub Bridge</title>
      <style>
        body {{ font-family: -apple-system, BlinkMacSystemFont, Segoe UI, sans-serif; background:#05050a; color:white; padding:24px; }}
        a {{ color:#b58cff; font-weight:800; }}
        code {{ background:rgba(255,255,255,.08); padding:2px 6px; border-radius:6px; }}
      </style>
    </head>
    <body>
      <h1>Twitch EventSub Bridge</h1>
      <p>Status: läuft.</p>
      <p><a href="{auth_url}">Twitch Channel für Hype-Train autorisieren</a></p>
      <h2>Autorisierte Channels</h2>
      <ul>{''.join(rows) if rows else '<li>Noch keine Channels autorisiert.</li>'}</ul>
      <p>PWA-WebSocket: <code>/twitch-events/ws</code></p>
    </body>
    </html>
    """
    return HTMLResponse(html)


@app.get("/health")
async def health() -> Dict[str, Any]:
    return {
        "ok": True,
        "eventsub_session_id": eventsub_session_id,
        "authorized_channels": list(load_tokens().keys()),
        "clients": len(clients),
    }


@app.get("/auth/login")
async def auth_login() -> RedirectResponse:
    ensure_config()
    params = {
        "client_id": TWITCH_CLIENT_ID,
        "redirect_uri": TWITCH_REDIRECT_URI,
        "response_type": "code",
        "scope": HYPE_SCOPE,
        "force_verify": "true",
    }
    return RedirectResponse(TWITCH_AUTH_URL + "?" + urlencode(params))


@app.get("/auth/callback")
async def auth_callback(code: str = Query(...)) -> HTMLResponse:
    token = await exchange_code_for_token(code)
    access_token = token["access_token"]
    user = await get_user_by_token(access_token)
    login = normalize_channel(user["login"])

    tokens = load_tokens()
    tokens[login] = {
        "login": login,
        "display_name": user.get("display_name"),
        "user_id": user["id"],
        "access_token": access_token,
        "refresh_token": token.get("refresh_token"),
        "scope": token.get("scope", []),
        "updated_at": int(time.time()),
    }
    save_tokens(tokens)

    if eventsub_session_id:
        await subscribe_authorized_channels(eventsub_session_id)

    return HTMLResponse(f"""
    <!doctype html>
    <html lang="de">
    <head><meta charset="utf-8"><meta name="viewport" content="width=device-width, initial-scale=1"></head>
    <body style="font-family:-apple-system,BlinkMacSystemFont,Segoe UI,sans-serif;background:#05050a;color:white;padding:24px;">
      <h1>Autorisiert</h1>
      <p>#{login} ist jetzt für Hype-Train-Events autorisiert.</p>
      <p><a style="color:#b58cff;font-weight:800;" href="/twitch-events/">Zurück zur Übersicht</a></p>
    </body>
    </html>
    """)


@app.get("/api/status")
async def api_status(channels: str = "") -> JSONResponse:
    requested = {normalize_channel(c) for c in channels.split(",") if normalize_channel(c)}
    tokens = load_tokens()
    authorized = set(tokens.keys())
    return JSONResponse({
        "authorized_channels": sorted(authorized),
        "requested_channels": sorted(requested),
        "missing_authorization": sorted(requested - authorized),
        "eventsub_connected": bool(eventsub_session_id),
    })


@app.websocket("/ws")
async def pwa_ws(websocket: WebSocket) -> None:
    await websocket.accept()
    clients.add(websocket)
    client_filters[websocket] = set()
    await websocket.send_text(json.dumps({
        "kind": "hello",
        "authorized_channels": list(load_tokens().keys()),
    }, ensure_ascii=False))

    try:
        while True:
            raw = await websocket.receive_text()
            try:
                data = json.loads(raw)
            except Exception:
                continue

            if data.get("type") == "watch":
                channels = {normalize_channel(c) for c in data.get("channels", []) if normalize_channel(c)}
                client_filters[websocket] = channels
                await websocket.send_text(json.dumps({
                    "kind": "watching",
                    "channels": sorted(channels),
                    "authorized_channels": list(load_tokens().keys()),
                }, ensure_ascii=False))

                if eventsub_session_id:
                    await subscribe_authorized_channels(eventsub_session_id)

    except WebSocketDisconnect:
        pass
    finally:
        clients.discard(websocket)
        client_filters.pop(websocket, None)
