working web hack

This commit is contained in:
Mathieu Broillet 2025-08-24 17:23:41 +02:00
parent 44157290cc
commit 7c595e9d67
Signed by: mathieub
GPG Key ID: 4428608CDA3A98D3
2 changed files with 393 additions and 3003 deletions

3177
const.py

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,13 @@
import xml.etree.ElementTree as ET
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import Response
from starlette.responses import JSONResponse
from client import AsyncCustomHost, NameSolver
from const import OFFICIAL_API
from const import OFFICIAL_API, FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \
CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z
app = FastAPI()
@ -31,8 +34,9 @@ HOP_BY_HOP_HEADERS = {
}
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def catch_all(request: Request, path: str):
async def call_official(request: Request, path: str) -> httpx.Response:
"""Forward the incoming request to the official API and return the response."""
# Copy request body
body = await request.body()
@ -56,28 +60,213 @@ async def catch_all(request: Request, path: str):
cookies=request.cookies,
)
# Copy response headers, filtering hop-by-hop
# Filter response headers hop-by-hop
resp_headers = {
k: v
for k, v in upstream_response.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
upstream_response.headers = resp_headers
content_type = upstream_response.headers.get("content-type", "")
return upstream_response
async def return_edited_response(
response: httpx.Response,
new_content: dict | bytes | None = None
) -> Response:
"""
Return possibly modified response.
If new_content is a dict and the response is JSON, it will merge/override keys.
If new_content and response data are lists, new_content will replace the entire response data.
If new_content is bytes or str, it will replace the response body entirely.
"""
content_type = response.headers.get("content-type", "")
response.headers.pop("content-encoding", None) # body is already decoded
resp_headers.pop("content-encoding", None) # remove, since body is decoded
if content_type.startswith("application/json"):
response = JSONResponse(
content=upstream_response.json(),
status_code=upstream_response.status_code,
headers=resp_headers
data = response.json()
if isinstance(new_content, dict): # allow overrides
data = {**data, **new_content}
if isinstance(data, list) and isinstance(new_content, list):
data = new_content # replace entire list
return JSONResponse(
content=data,
status_code=response.status_code,
headers=response.headers
)
else:
response = Response(
content=upstream_response.content,
status_code=upstream_response.status_code,
headers=resp_headers,
content = new_content if isinstance(new_content, (bytes, str)) else response.content
return Response(
content=content,
status_code=response.status_code,
headers=response.headers,
media_type=content_type
)
return response
@app.get("/api/v2/home")
async def fake_get_home(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = {
"subscription": True,
}
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/features")
async def fake_get_features(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = FEATURES_DICT
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/user")
async def fake_get_user(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
content_type = upstream_response.headers.get("content-type", "")
if content_type.startswith("application/json"):
data_override = {
"subscriptionDescription": "Monthly Plex Pass",
"roles": ["plexpass"],
"entitlements": ENTITLEMENTS,
"subscription": {
"active": True,
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY,
"status": "Active",
"paymentService": "braintree",
"plan": "monthly",
"paymentNotificationId": "1234567",
"features": FEATURES_DICT
}
}
elif content_type.startswith("application/xml"):
data = ET.fromstring(upstream_response.content)
data.attrib["subscriptionDescription"] = "Monthly Plex Pass"
subscription = data.find("subscription")
if subscription is not None:
subscription.set("active", "1")
subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY)
subscription.set("status", "Active")
subscription.set("paymentService", "braintree")
subscription.set("plan", "monthly")
subscription.set("paymentNotificationId", "3421431")
features = subscription.find("features")
if features is not None:
features.clear()
for feature in FEATURES:
ET.SubElement(features, "feature", {"id": feature})
subscriptions = data.find("subscriptions")
if subscriptions is not None:
subscriptions.clear()
sub = ET.SubElement(subscriptions, "subscription", {
"id": "1234567",
"mode": "monthly",
"startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
"renewsAt": NEXT_MONTH_DATE_Z,
"endsAt": "",
"canceled": "0",
"gracePeriod": "0",
"onHold": "0",
"canReactivate": "0",
"canUpgrade": "0",
"canDowngrade": "0",
"canConvert": "0",
"type": "plexpass",
"braintreeId": "xyzxyz",
"state": "active"
})
billing = ET.SubElement(sub, "billing")
ET.SubElement(billing, "paymentMethod", {"id": "1234567"})
ET.SubElement(billing, "internalPaymentMethod")
# --- Replace <entitlements> ---
for ents in data.findall("entitlements"):
data.remove(ents)
entitlements = ET.SubElement(data, "entitlements")
for entitlement in ENTITLEMENTS:
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
# --- Replace <roles> ---
for roles in data.findall("roles"):
data.remove(roles)
roles = ET.SubElement(data, "roles")
ET.SubElement(roles, "role", {"id": "plexpass"})
# Pretty print XML
ET.indent(data)
data_override = ET.tostring(data, encoding="utf-8", method="xml")
return await return_edited_response(upstream_response, data_override)
@app.post("/api/v2/users/signin")
async def fake_user_signin(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
content_type = upstream_response.headers.get("content-type", "")
if content_type.startswith("application/json"):
data_override = {
"subscriptionDescription": "Monthly Plex Pass",
"roles": ["plexpass"],
"entitlements": ENTITLEMENTS,
"subscription": {
"active": True,
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
"status": "Active",
"paymentService": "braintree",
"plan": "monthly",
"paymentNotificationId": "1234567",
"canUpgrade": True,
"features": FEATURES_DICT
},
"subscriptions": [
{
"id": 1234567,
"mode": "monthly",
"startsAt": 1755963427,
"renewsAt": 1758585600,
"endsAt": None,
"billing": {
"paymentMethodId": 1234567,
"internalPaymentMethod": {}
},
"canceled": False,
"gracePeriod": False,
"onHold": False,
"canReactivate": False,
"canUpgrade": False,
"canDowngrade": False,
"canConvert": False,
"type": "plexpass",
"braintreeId": "xyzxyz",
"state": "active"
}
]
}
return await return_edited_response(upstream_response, data_override)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def catch_all(request: Request, path: str):
upstream_response = await call_official(request, path)
return await return_edited_response(upstream_response)