import xml.etree.ElementTree as ET import httpx from fastapi import FastAPI, Request from fastapi.responses import Response from starlette.responses import JSONResponse from client import AsyncCustomHost, NameSolver from const import OFFICIAL_API, FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \ CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z app = FastAPI() # Configure upstream client official_client = httpx.AsyncClient( verify=True, timeout=httpx.Timeout(30.0, connect=10.0), limits=httpx.Limits(max_keepalive_connections=50, max_connections=100), transport=AsyncCustomHost(NameSolver()) ) # Hop-by-hop headers we must strip HOP_BY_HOP_HEADERS = { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailer", "transfer-encoding", "upgrade", "content-length", # let httpx calculate "host", # must be rewritten } async def call_official(request: Request, path: str) -> httpx.Response: """Forward the incoming request to the official API and return the response.""" # Copy request body body = await request.body() # Copy headers, filtering hop-by-hop req_headers = { k: v for k, v in request.headers.items() if k.lower() not in HOP_BY_HOP_HEADERS } # Rewrite Host header to match upstream req_headers["host"] = httpx.URL(OFFICIAL_API).host # Forward request upstream upstream_response = await official_client.request( method=request.method, url=f"{OFFICIAL_API}/{path}", headers=req_headers, content=body, params=request.query_params, cookies=request.cookies, ) # Filter response headers hop-by-hop resp_headers = { k: v for k, v in upstream_response.headers.items() if k.lower() not in HOP_BY_HOP_HEADERS } upstream_response.headers = resp_headers return upstream_response async def return_edited_response( response: httpx.Response, new_content: dict | bytes | None = None ) -> Response: """ Return possibly modified response. If new_content is a dict and the response is JSON, it will merge/override keys. If new_content and response data are lists, new_content will replace the entire response data. If new_content is bytes or str, it will replace the response body entirely. """ content_type = response.headers.get("content-type", "") response.headers.pop("content-encoding", None) # body is already decoded if content_type.startswith("application/json"): data = response.json() if isinstance(new_content, dict): # allow overrides data = {**data, **new_content} if isinstance(data, list) and isinstance(new_content, list): data = new_content # replace entire list return JSONResponse( content=data, status_code=response.status_code, headers=response.headers ) else: content = new_content if isinstance(new_content, (bytes, str)) else response.content return Response( content=content, status_code=response.status_code, headers=response.headers, media_type=content_type ) @app.get("/api/v2/home") async def fake_get_home(request: Request): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = { "subscription": True, } return await return_edited_response(upstream_response, data_override) @app.get("/api/v2/features") async def fake_get_features(request: Request): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = FEATURES_DICT return await return_edited_response(upstream_response, data_override) @app.get("/api/v2/user") async def fake_get_user(request: Request): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/json"): data_override = { "subscriptionDescription": "Monthly Plex Pass", "roles": ["plexpass"], "entitlements": ENTITLEMENTS, "subscription": { "active": True, "subscribedAt": CURRENT_DATE_MINUS_ONE_DAY, "status": "Active", "paymentService": "braintree", "plan": "monthly", "paymentNotificationId": "1234567", "features": FEATURES_DICT } } elif content_type.startswith("application/xml"): data = ET.fromstring(upstream_response.content) data.attrib["subscriptionDescription"] = "Monthly Plex Pass" subscription = data.find("subscription") if subscription is not None: subscription.set("active", "1") subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY) subscription.set("status", "Active") subscription.set("paymentService", "braintree") subscription.set("plan", "monthly") subscription.set("paymentNotificationId", "3421431") features = subscription.find("features") if features is not None: features.clear() for feature in FEATURES: ET.SubElement(features, "feature", {"id": feature}) subscriptions = data.find("subscriptions") if subscriptions is not None: subscriptions.clear() sub = ET.SubElement(subscriptions, "subscription", { "id": "1234567", "mode": "monthly", "startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z, "renewsAt": NEXT_MONTH_DATE_Z, "endsAt": "", "canceled": "0", "gracePeriod": "0", "onHold": "0", "canReactivate": "0", "canUpgrade": "0", "canDowngrade": "0", "canConvert": "0", "type": "plexpass", "braintreeId": "xyzxyz", "state": "active" }) billing = ET.SubElement(sub, "billing") ET.SubElement(billing, "paymentMethod", {"id": "1234567"}) ET.SubElement(billing, "internalPaymentMethod") # --- Replace --- for ents in data.findall("entitlements"): data.remove(ents) entitlements = ET.SubElement(data, "entitlements") for entitlement in ENTITLEMENTS: ET.SubElement(entitlements, "entitlement", {"id": entitlement}) # --- Replace --- for roles in data.findall("roles"): data.remove(roles) roles = ET.SubElement(data, "roles") ET.SubElement(roles, "role", {"id": "plexpass"}) # Pretty print XML ET.indent(data) data_override = ET.tostring(data, encoding="utf-8", method="xml") return await return_edited_response(upstream_response, data_override) @app.post("/api/v2/users/signin") async def fake_user_signin(request: Request): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/json"): data_override = { "subscriptionDescription": "Monthly Plex Pass", "roles": ["plexpass"], "entitlements": ENTITLEMENTS, "subscription": { "active": True, "subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z, "status": "Active", "paymentService": "braintree", "plan": "monthly", "paymentNotificationId": "1234567", "canUpgrade": True, "features": FEATURES_DICT }, "subscriptions": [ { "id": 1234567, "mode": "monthly", "startsAt": 1755963427, "renewsAt": 1758585600, "endsAt": None, "billing": { "paymentMethodId": 1234567, "internalPaymentMethod": {} }, "canceled": False, "gracePeriod": False, "onHold": False, "canReactivate": False, "canUpgrade": False, "canDowngrade": False, "canConvert": False, "type": "plexpass", "braintreeId": "xyzxyz", "state": "active" } ] } return await return_edited_response(upstream_response, data_override) @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def catch_all(request: Request, path: str): upstream_response = await call_official(request, path) return await return_edited_response(upstream_response)