368 lines
13 KiB
Python
368 lines
13 KiB
Python
import xml.etree.ElementTree as ET
|
|
|
|
import httpx
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.responses import Response
|
|
from starlette.responses import JSONResponse
|
|
|
|
from client import AsyncCustomHost, NameSolver
|
|
from const import FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \
|
|
CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z, PLEXAMP_FEATURES, TIMESTAMP_CURRENT_PLUS_30DAYS, \
|
|
TIMESTAMP_CURRENT_MINUS_30MIN, PLEXAMP_SUBSCRIPTION_XML
|
|
|
|
app = FastAPI()
|
|
|
|
# Configure upstream client
|
|
official_client = httpx.AsyncClient(
|
|
verify=True,
|
|
timeout=httpx.Timeout(30.0, connect=10.0),
|
|
limits=httpx.Limits(max_keepalive_connections=50, max_connections=100),
|
|
transport=AsyncCustomHost(NameSolver())
|
|
)
|
|
|
|
# Hop-by-hop headers we must strip
|
|
HOP_BY_HOP_HEADERS = {
|
|
"connection",
|
|
"keep-alive",
|
|
"proxy-authenticate",
|
|
"proxy-authorization",
|
|
"te",
|
|
"trailer",
|
|
"transfer-encoding",
|
|
"upgrade",
|
|
"content-length", # let httpx calculate
|
|
"host", # must be rewritten
|
|
}
|
|
|
|
|
|
async def call_official(request: Request, path: str) -> httpx.Response:
|
|
"""Forward the incoming request to the official API and return the response."""
|
|
|
|
# Copy request body
|
|
body = await request.body()
|
|
|
|
# Copy headers, filtering hop-by-hop
|
|
req_headers = {
|
|
k: v
|
|
for k, v in request.headers.items()
|
|
if k.lower() not in HOP_BY_HOP_HEADERS
|
|
}
|
|
|
|
# Rewrite Host header to match upstream
|
|
req_headers["host"] = request.url.hostname
|
|
|
|
# Forward request upstream
|
|
upstream_response = await official_client.request(
|
|
method=request.method,
|
|
url=f"{str(request.base_url).rstrip('/')}/{path}", # f"{OFFICIAL_API}/{path}",
|
|
headers=req_headers,
|
|
content=body,
|
|
params=request.query_params,
|
|
cookies=request.cookies,
|
|
)
|
|
|
|
# Filter response headers hop-by-hop
|
|
resp_headers = {
|
|
k: v
|
|
for k, v in upstream_response.headers.items()
|
|
if k.lower() not in HOP_BY_HOP_HEADERS
|
|
}
|
|
upstream_response.headers = resp_headers
|
|
|
|
return upstream_response
|
|
|
|
|
|
async def return_edited_response(
|
|
response: httpx.Response,
|
|
new_content: dict | bytes | None = None
|
|
) -> Response:
|
|
"""
|
|
Return possibly modified response.
|
|
|
|
If new_content is a dict and the response is JSON, it will merge/override keys.
|
|
If new_content and response data are lists, new_content will replace the entire response data.
|
|
If new_content is bytes or str, it will replace the response body entirely.
|
|
"""
|
|
|
|
content_type = response.headers.get("content-type", "")
|
|
response.headers.pop("content-encoding", None) # body is already decoded
|
|
|
|
if content_type.startswith("application/json"):
|
|
data = response.json()
|
|
|
|
if isinstance(new_content, dict): # allow overrides
|
|
data = {**data, **new_content}
|
|
if isinstance(data, list) and isinstance(new_content, list):
|
|
data = new_content # replace entire list
|
|
|
|
return JSONResponse(
|
|
content=data,
|
|
status_code=response.status_code,
|
|
headers=response.headers
|
|
)
|
|
else:
|
|
content = new_content if isinstance(new_content, (bytes, str)) else response.content
|
|
return Response(
|
|
content=content,
|
|
status_code=response.status_code,
|
|
headers=response.headers,
|
|
media_type=content_type
|
|
)
|
|
|
|
|
|
@app.get("/api/v2/home")
|
|
async def fake_get_home(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = {
|
|
"subscription": True,
|
|
}
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/features")
|
|
async def fake_get_features(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = FEATURES_DICT
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/user")
|
|
async def fake_get_user(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/json"):
|
|
data_override = {
|
|
"subscriptionDescription": "Monthly Plex Pass",
|
|
"roles": ["plexpass"],
|
|
"entitlements": ENTITLEMENTS,
|
|
"subscription": {
|
|
"active": True,
|
|
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY,
|
|
"status": "Active",
|
|
"paymentService": "braintree",
|
|
"plan": "monthly",
|
|
"paymentNotificationId": "1234567",
|
|
"features": FEATURES_DICT
|
|
}
|
|
}
|
|
elif content_type.startswith("application/xml"):
|
|
data = ET.fromstring(upstream_response.content)
|
|
data.attrib["subscriptionDescription"] = "Monthly Plex Pass"
|
|
|
|
subscription = data.find("subscription")
|
|
if subscription is not None:
|
|
subscription.set("active", "1")
|
|
subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY)
|
|
subscription.set("status", "Active")
|
|
subscription.set("paymentService", "braintree")
|
|
subscription.set("plan", "monthly")
|
|
subscription.set("paymentNotificationId", "3421431")
|
|
|
|
features = subscription.find("features")
|
|
if features is not None:
|
|
features.clear()
|
|
for feature in FEATURES:
|
|
ET.SubElement(features, "feature", {"id": feature})
|
|
|
|
subscriptions = data.find("subscriptions")
|
|
if subscriptions is not None:
|
|
subscriptions.clear()
|
|
sub = ET.SubElement(subscriptions, "subscription", {
|
|
"id": "1234567",
|
|
"mode": "monthly",
|
|
"startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
|
|
"renewsAt": NEXT_MONTH_DATE_Z,
|
|
"endsAt": "",
|
|
"canceled": "0",
|
|
"gracePeriod": "0",
|
|
"onHold": "0",
|
|
"canReactivate": "0",
|
|
"canUpgrade": "0",
|
|
"canDowngrade": "0",
|
|
"canConvert": "0",
|
|
"type": "plexpass",
|
|
"braintreeId": "xyzxyz",
|
|
"state": "active"
|
|
})
|
|
billing = ET.SubElement(sub, "billing")
|
|
ET.SubElement(billing, "paymentMethod", {"id": "1234567"})
|
|
ET.SubElement(billing, "internalPaymentMethod")
|
|
|
|
# --- Replace <entitlements> ---
|
|
for ents in data.findall("entitlements"):
|
|
data.remove(ents)
|
|
entitlements = ET.SubElement(data, "entitlements")
|
|
for entitlement in ENTITLEMENTS:
|
|
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
|
|
|
|
# --- Replace <roles> ---
|
|
for roles in data.findall("roles"):
|
|
data.remove(roles)
|
|
roles = ET.SubElement(data, "roles")
|
|
ET.SubElement(roles, "role", {"id": "plexpass"})
|
|
|
|
# Pretty print XML
|
|
ET.indent(data)
|
|
|
|
data_override = ET.tostring(data, encoding="utf-8", method="xml")
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.post("/api/v2/users/signin")
|
|
async def fake_user_signin(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/json"):
|
|
data_override = {
|
|
"subscriptionDescription": "Monthly Plex Pass",
|
|
"roles": ["plexpass"],
|
|
"entitlements": ENTITLEMENTS,
|
|
"subscription": {
|
|
"active": True,
|
|
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
|
|
"status": "Active",
|
|
"paymentService": "braintree",
|
|
"plan": "monthly",
|
|
"paymentNotificationId": "1234567",
|
|
"canUpgrade": True,
|
|
"features": PLEXAMP_FEATURES if "plexamp" in request.headers.get(
|
|
"X-Plex-Product").lower() else FEATURES_DICT
|
|
},
|
|
"subscriptions": [
|
|
{
|
|
"id": 1234567,
|
|
"mode": "monthly",
|
|
"startsAt": TIMESTAMP_CURRENT_MINUS_30MIN,
|
|
"renewsAt": TIMESTAMP_CURRENT_PLUS_30DAYS,
|
|
"endsAt": None,
|
|
"billing": {
|
|
"paymentMethodId": 1234567,
|
|
"internalPaymentMethod": {}
|
|
},
|
|
"canceled": False,
|
|
"gracePeriod": False,
|
|
"onHold": False,
|
|
"canReactivate": False,
|
|
"canUpgrade": False,
|
|
"canDowngrade": False,
|
|
"canConvert": False,
|
|
"type": "plexpass",
|
|
"braintreeId": "xyzxyz",
|
|
"state": "active"
|
|
}
|
|
]
|
|
}
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/user.json")
|
|
async def fake_get_user_json(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/json"):
|
|
data_override = {
|
|
"subscriptionDescription": "Monthly Plex Pass",
|
|
"roles": ["plexpass"],
|
|
"entitlements": ENTITLEMENTS,
|
|
"subscription": {
|
|
"active": True,
|
|
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
|
|
"status": "Active",
|
|
"paymentService": "braintree",
|
|
"plan": "monthly",
|
|
"paymentNotificationId": "1234567",
|
|
"canUpgrade": True,
|
|
"features": PLEXAMP_FEATURES if "plexamp" in request.headers.get(
|
|
"X-Plex-Product").lower() else FEATURES_DICT
|
|
},
|
|
"subscriptions": [
|
|
{
|
|
"id": 1234567,
|
|
"mode": "monthly",
|
|
"startsAt": TIMESTAMP_CURRENT_MINUS_30MIN,
|
|
"renewsAt": TIMESTAMP_CURRENT_PLUS_30DAYS,
|
|
"endsAt": None,
|
|
"billing": {
|
|
"paymentMethodId": 1234567,
|
|
"internalPaymentMethod": {}
|
|
},
|
|
"canceled": False,
|
|
"gracePeriod": False,
|
|
"onHold": False,
|
|
"canReactivate": False,
|
|
"canUpgrade": False,
|
|
"canDowngrade": False,
|
|
"canConvert": False,
|
|
"type": "plexpass",
|
|
"braintreeId": "xyzxyz",
|
|
"state": "active"
|
|
}
|
|
]
|
|
}
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.post("/api/claim/exchange")
|
|
async def fake_claim_exchange(request: Request):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/xml"):
|
|
data = ET.fromstring(upstream_response.content)
|
|
|
|
subscription = data.find("subscription")
|
|
if subscription is not None:
|
|
parent = data
|
|
parent.remove(subscription)
|
|
parent.append(PLEXAMP_SUBSCRIPTION_XML)
|
|
|
|
# --- Replace <entitlements> ---
|
|
for ents in data.findall("entitlements"):
|
|
data.remove(ents)
|
|
entitlements = ET.SubElement(data, "entitlements", {"all": "1"})
|
|
for entitlement in ENTITLEMENTS:
|
|
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
|
|
|
|
# --- Replace <roles> ---
|
|
for roles in data.findall("roles"):
|
|
data.remove(roles)
|
|
roles = ET.SubElement(data, "roles")
|
|
ET.SubElement(roles, "role", {"id": "plexpass"})
|
|
|
|
# Pretty print XML
|
|
ET.indent(data)
|
|
|
|
data_override = ET.tostring(data, encoding="utf-8", method="xml")
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/hack")
|
|
async def hack_endpoint():
|
|
return {"status": "OK, Plex Pass features proxy enabled"}
|
|
|
|
|
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
|
|
async def catch_all(request: Request, path: str):
|
|
upstream_response = await call_official(request, path)
|
|
|
|
return await return_edited_response(upstream_response)
|