273 lines
9.0 KiB
Python

import xml.etree.ElementTree as ET
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import Response
from starlette.responses import JSONResponse
from client import AsyncCustomHost, NameSolver
from const import OFFICIAL_API, FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \
CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z
app = FastAPI()
# Configure upstream client
official_client = httpx.AsyncClient(
verify=True,
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=50, max_connections=100),
transport=AsyncCustomHost(NameSolver())
)
# Hop-by-hop headers we must strip
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
"content-length", # let httpx calculate
"host", # must be rewritten
}
async def call_official(request: Request, path: str) -> httpx.Response:
"""Forward the incoming request to the official API and return the response."""
# Copy request body
body = await request.body()
# Copy headers, filtering hop-by-hop
req_headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
# Rewrite Host header to match upstream
req_headers["host"] = httpx.URL(OFFICIAL_API).host
# Forward request upstream
upstream_response = await official_client.request(
method=request.method,
url=f"{OFFICIAL_API}/{path}",
headers=req_headers,
content=body,
params=request.query_params,
cookies=request.cookies,
)
# Filter response headers hop-by-hop
resp_headers = {
k: v
for k, v in upstream_response.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
upstream_response.headers = resp_headers
return upstream_response
async def return_edited_response(
response: httpx.Response,
new_content: dict | bytes | None = None
) -> Response:
"""
Return possibly modified response.
If new_content is a dict and the response is JSON, it will merge/override keys.
If new_content and response data are lists, new_content will replace the entire response data.
If new_content is bytes or str, it will replace the response body entirely.
"""
content_type = response.headers.get("content-type", "")
response.headers.pop("content-encoding", None) # body is already decoded
if content_type.startswith("application/json"):
data = response.json()
if isinstance(new_content, dict): # allow overrides
data = {**data, **new_content}
if isinstance(data, list) and isinstance(new_content, list):
data = new_content # replace entire list
return JSONResponse(
content=data,
status_code=response.status_code,
headers=response.headers
)
else:
content = new_content if isinstance(new_content, (bytes, str)) else response.content
return Response(
content=content,
status_code=response.status_code,
headers=response.headers,
media_type=content_type
)
@app.get("/api/v2/home")
async def fake_get_home(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = {
"subscription": True,
}
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/features")
async def fake_get_features(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = FEATURES_DICT
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/user")
async def fake_get_user(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
content_type = upstream_response.headers.get("content-type", "")
if content_type.startswith("application/json"):
data_override = {
"subscriptionDescription": "Monthly Plex Pass",
"roles": ["plexpass"],
"entitlements": ENTITLEMENTS,
"subscription": {
"active": True,
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY,
"status": "Active",
"paymentService": "braintree",
"plan": "monthly",
"paymentNotificationId": "1234567",
"features": FEATURES_DICT
}
}
elif content_type.startswith("application/xml"):
data = ET.fromstring(upstream_response.content)
data.attrib["subscriptionDescription"] = "Monthly Plex Pass"
subscription = data.find("subscription")
if subscription is not None:
subscription.set("active", "1")
subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY)
subscription.set("status", "Active")
subscription.set("paymentService", "braintree")
subscription.set("plan", "monthly")
subscription.set("paymentNotificationId", "3421431")
features = subscription.find("features")
if features is not None:
features.clear()
for feature in FEATURES:
ET.SubElement(features, "feature", {"id": feature})
subscriptions = data.find("subscriptions")
if subscriptions is not None:
subscriptions.clear()
sub = ET.SubElement(subscriptions, "subscription", {
"id": "1234567",
"mode": "monthly",
"startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
"renewsAt": NEXT_MONTH_DATE_Z,
"endsAt": "",
"canceled": "0",
"gracePeriod": "0",
"onHold": "0",
"canReactivate": "0",
"canUpgrade": "0",
"canDowngrade": "0",
"canConvert": "0",
"type": "plexpass",
"braintreeId": "xyzxyz",
"state": "active"
})
billing = ET.SubElement(sub, "billing")
ET.SubElement(billing, "paymentMethod", {"id": "1234567"})
ET.SubElement(billing, "internalPaymentMethod")
# --- Replace <entitlements> ---
for ents in data.findall("entitlements"):
data.remove(ents)
entitlements = ET.SubElement(data, "entitlements")
for entitlement in ENTITLEMENTS:
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
# --- Replace <roles> ---
for roles in data.findall("roles"):
data.remove(roles)
roles = ET.SubElement(data, "roles")
ET.SubElement(roles, "role", {"id": "plexpass"})
# Pretty print XML
ET.indent(data)
data_override = ET.tostring(data, encoding="utf-8", method="xml")
return await return_edited_response(upstream_response, data_override)
@app.post("/api/v2/users/signin")
async def fake_user_signin(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
content_type = upstream_response.headers.get("content-type", "")
if content_type.startswith("application/json"):
data_override = {
"subscriptionDescription": "Monthly Plex Pass",
"roles": ["plexpass"],
"entitlements": ENTITLEMENTS,
"subscription": {
"active": True,
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
"status": "Active",
"paymentService": "braintree",
"plan": "monthly",
"paymentNotificationId": "1234567",
"canUpgrade": True,
"features": FEATURES_DICT
},
"subscriptions": [
{
"id": 1234567,
"mode": "monthly",
"startsAt": 1755963427,
"renewsAt": 1758585600,
"endsAt": None,
"billing": {
"paymentMethodId": 1234567,
"internalPaymentMethod": {}
},
"canceled": False,
"gracePeriod": False,
"onHold": False,
"canReactivate": False,
"canUpgrade": False,
"canDowngrade": False,
"canConvert": False,
"type": "plexpass",
"braintreeId": "xyzxyz",
"state": "active"
}
]
}
return await return_edited_response(upstream_response, data_override)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def catch_all(request: Request, path: str):
upstream_response = await call_official(request, path)
return await return_edited_response(upstream_response)