import xml.etree.ElementTree as ET from urllib.parse import parse_qs from fastapi import FastAPI, Request from starlette.responses import FileResponse from const import FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \ CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z, PLEXAMP_FEATURES, TIMESTAMP_CURRENT_PLUS_30DAYS, \ TIMESTAMP_CURRENT_MINUS_30MIN, PLEXAMP_SUBSCRIPTION_XML, Hosts from utils import call_official, return_edited_response, host_required, fake_response app = FastAPI() @app.get("/api/v2/home", tags=[Hosts.CLIENTS.value]) async def fake_get_home(request: Request, _=host_required([Hosts.CLIENTS])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = { "subscription": True, } return await return_edited_response(upstream_response, data_override) @app.get("/api/v2/features", tags=[Hosts.CLIENTS.value, Hosts.PLEX.value]) async def fake_get_features(request: Request, _=host_required([Hosts.CLIENTS, Hosts.PLEX])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = FEATURES_DICT return await return_edited_response(upstream_response, data_override) @app.get("/api/v2/user", tags=[Hosts.CLIENTS.value, Hosts.PLEX.value]) async def fake_get_user(request: Request, _=host_required([Hosts.CLIENTS, Hosts.PLEX])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/json"): data_override = { "subscriptionDescription": "Monthly Plex Pass", "roles": ["plexpass"], "entitlements": ENTITLEMENTS, "subscription": { "active": True, "subscribedAt": CURRENT_DATE_MINUS_ONE_DAY, "status": "Active", "paymentService": "braintree", "plan": "monthly", "paymentNotificationId": "1234567", "features": FEATURES_DICT } } elif content_type.startswith("application/xml"): data = ET.fromstring(upstream_response.content) data.attrib["subscriptionDescription"] = "Monthly Plex Pass" subscription = data.find("subscription") if subscription is not None: subscription.set("active", "1") subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY) subscription.set("status", "Active") subscription.set("paymentService", "braintree") subscription.set("plan", "monthly") subscription.set("paymentNotificationId", "1234567") features = subscription.find("features") if features is not None: features.clear() for feature in FEATURES: ET.SubElement(features, "feature", {"id": feature}) subscriptions = data.find("subscriptions") if subscriptions is not None: subscriptions.clear() sub = ET.SubElement(subscriptions, "subscription", { "id": "1234567", "mode": "monthly", "startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z, "renewsAt": NEXT_MONTH_DATE_Z, "endsAt": "", "canceled": "0", "gracePeriod": "0", "onHold": "0", "canReactivate": "0", "canUpgrade": "0", "canDowngrade": "0", "canConvert": "0", "type": "plexpass", "braintreeId": "xyzxyz", "state": "active" }) billing = ET.SubElement(sub, "billing") ET.SubElement(billing, "paymentMethod", {"id": "1234567"}) ET.SubElement(billing, "internalPaymentMethod") # --- Replace --- for ents in data.findall("entitlements"): data.remove(ents) entitlements = ET.SubElement(data, "entitlements") for entitlement in ENTITLEMENTS: ET.SubElement(entitlements, "entitlement", {"id": entitlement}) # --- Replace --- for roles in data.findall("roles"): data.remove(roles) roles = ET.SubElement(data, "roles") ET.SubElement(roles, "role", {"id": "plexpass"}) # Pretty print XML ET.indent(data) data_override = ET.tostring(data, encoding="utf-8", method="xml") return await return_edited_response(upstream_response, data_override) @app.post("/api/v2/users/signin", tags=[Hosts.CLIENTS.value, Hosts.PLEX.value], name="Fake signin") @app.get("/api/v2/user.json", tags=[Hosts.PLEX.value], name="Fake user.json", description="Used by Plexamp") async def fake_user_json_or_signin(request: Request, _=host_required([Hosts.CLIENTS, Hosts.PLEX])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/json"): data_override = { "subscriptionDescription": "Monthly Plex Pass", "roles": ["plexpass"], "entitlements": ENTITLEMENTS, "subscription": { "active": True, "subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z, "status": "Active", "paymentService": "braintree", "plan": "monthly", "paymentNotificationId": "1234567", "canUpgrade": True, "features": PLEXAMP_FEATURES if "plexamp" in request.headers.get("X-Plex-Product", "").lower() else FEATURES_DICT }, "subscriptions": [ { "id": 1234567, "mode": "monthly", "startsAt": TIMESTAMP_CURRENT_MINUS_30MIN, "renewsAt": TIMESTAMP_CURRENT_PLUS_30DAYS, "endsAt": None, "billing": { "paymentMethodId": 1234567, "internalPaymentMethod": {} }, "canceled": False, "gracePeriod": False, "onHold": False, "canReactivate": False, "canUpgrade": False, "canDowngrade": False, "canConvert": False, "type": "plexpass", "braintreeId": "xyzxyz", "state": "active" } ] } return await return_edited_response(upstream_response, data_override) @app.post("/api/claim/exchange", tags=[Hosts.PLEX.value]) async def fake_claim_exchange(request: Request, _=host_required([Hosts.PLEX])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/xml"): data = ET.fromstring(upstream_response.content) subscription = data.find("subscription") if subscription is not None: parent = data parent.remove(subscription) parent.append(PLEXAMP_SUBSCRIPTION_XML) # --- Replace --- for ents in data.findall("entitlements"): data.remove(ents) entitlements = ET.SubElement(data, "entitlements", {"all": "1"}) for entitlement in ENTITLEMENTS: ET.SubElement(entitlements, "entitlement", {"id": entitlement}) # --- Replace --- for roles in data.findall("roles"): data.remove(roles) roles = ET.SubElement(data, "roles") ET.SubElement(roles, "role", {"id": "plexpass"}) # Pretty print XML ET.indent(data) data_override = ET.tostring(data, encoding="utf-8", method="xml") return await return_edited_response(upstream_response, data_override) @app.post("/api/v2/user/webhooks", tags=[Hosts.CLIENTS.value]) async def fake_get_user_webhooks(request: Request, _=host_required([Hosts.CLIENTS])): # FIXME: this doesnt really work because webhooks are sent from Plex themselves so unless we implement a complete # webhook emitter this is kinda useless but at least removes errors from UI... upstream_response = await call_official(request, request.url.path.lstrip("/")) body = await request.body() raw_text = body.decode("utf-8") parsed_urls = parse_qs(raw_text) data_override = [ {"url": url} for url in parsed_urls.get("urls[]", []) ] return await return_edited_response(upstream_response, data_override) @app.get("/api/v2/user/webhooks", tags=[Hosts.CLIENTS.value]) async def fake_list_user_webhooks(request: Request, _=host_required([Hosts.CLIENTS])): # See note in POST /api/v2/user/webhooks upstream_response = await call_official(request, request.url.path.lstrip("/")) upstream_response.status_code = 200 data_override = [{"url": "https://webhooks.are.not.supported.because.plex.sends.them/not/your/server"}] return await return_edited_response(upstream_response, data_override, ignore_official_data=True) @app.post("/v1/initialize", tags=[Hosts.FEATURES.value]) @app.options("/v1/initialize", tags=[Hosts.FEATURES.value]) @app.post("/v1/rgstr", tags=[Hosts.FEATURES.value]) @app.options("/v1/rgstr", tags=[Hosts.FEATURES.value]) @app.post("/collect/event", tags=[Hosts.ANALYTICS.value]) @app.options("/collect/event", tags=[Hosts.ANALYTICS.value]) async def block_telemetry(): return fake_response({"status": "disabled by proxy :)"}, status_code=200) @app.post("/collect/server", tags=[Hosts.ANALYTICS.value]) async def fake_server_details(request: Request, _=host_required([Hosts.ANALYTICS])): upstream_response = await call_official(request, request.url.path.lstrip("/")) data_override = None content_type = upstream_response.headers.get("content-type", "") if content_type.startswith("application/xml"): data = ET.fromstring(upstream_response.content) data.attrib['plexPass'] = '1' # Set plexPass to 1 # Pretty print XML ET.indent(data) data_override = ET.tostring(data, encoding="utf-8", method="xml") return await return_edited_response(upstream_response, data_override) @app.get("/favicon.ico") async def favicon(): return FileResponse("favicon.ico") @app.get("/proxy") async def hack(): return {"status": "ok"} @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def catch_all(request: Request, path: str): if request.base_url.hostname in ["127.0.0.1", "localhost", "0.0.0.0"]: return {"status": "proxy running ok"} upstream_response = await call_official(request, path) return await return_edited_response(upstream_response)