267 lines
10 KiB
Python
267 lines
10 KiB
Python
import xml.etree.ElementTree as ET
|
|
from urllib.parse import parse_qs
|
|
|
|
from fastapi import FastAPI, Request
|
|
from starlette.responses import FileResponse
|
|
|
|
from const import FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \
|
|
CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z, PLEXAMP_FEATURES, TIMESTAMP_CURRENT_PLUS_30DAYS, \
|
|
TIMESTAMP_CURRENT_MINUS_30MIN, PLEXAMP_SUBSCRIPTION_XML, Hosts
|
|
from utils import call_official, return_edited_response, host_required, fake_response
|
|
|
|
app = FastAPI()
|
|
|
|
|
|
@app.get("/api/v2/home", tags=[Hosts.CLIENTS.value])
|
|
async def fake_get_home(request: Request, _=host_required([Hosts.CLIENTS])):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = {
|
|
"subscription": True,
|
|
}
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/features", tags=[Hosts.CLIENTS.value])
|
|
async def fake_get_features(request: Request, _=host_required([Hosts.CLIENTS])):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = FEATURES_DICT
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/user", tags=[Hosts.CLIENTS.value])
|
|
async def fake_get_user(request: Request, _=host_required([Hosts.CLIENTS])):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/json"):
|
|
data_override = {
|
|
"subscriptionDescription": "Monthly Plex Pass",
|
|
"roles": ["plexpass"],
|
|
"entitlements": ENTITLEMENTS,
|
|
"subscription": {
|
|
"active": True,
|
|
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY,
|
|
"status": "Active",
|
|
"paymentService": "braintree",
|
|
"plan": "monthly",
|
|
"paymentNotificationId": "1234567",
|
|
"features": FEATURES_DICT
|
|
}
|
|
}
|
|
elif content_type.startswith("application/xml"):
|
|
data = ET.fromstring(upstream_response.content)
|
|
data.attrib["subscriptionDescription"] = "Monthly Plex Pass"
|
|
|
|
subscription = data.find("subscription")
|
|
if subscription is not None:
|
|
subscription.set("active", "1")
|
|
subscription.set("subscribedAt", CURRENT_DATE_MINUS_ONE_DAY)
|
|
subscription.set("status", "Active")
|
|
subscription.set("paymentService", "braintree")
|
|
subscription.set("plan", "monthly")
|
|
subscription.set("paymentNotificationId", "1234567")
|
|
|
|
features = subscription.find("features")
|
|
if features is not None:
|
|
features.clear()
|
|
for feature in FEATURES:
|
|
ET.SubElement(features, "feature", {"id": feature})
|
|
|
|
subscriptions = data.find("subscriptions")
|
|
if subscriptions is not None:
|
|
subscriptions.clear()
|
|
sub = ET.SubElement(subscriptions, "subscription", {
|
|
"id": "1234567",
|
|
"mode": "monthly",
|
|
"startsAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
|
|
"renewsAt": NEXT_MONTH_DATE_Z,
|
|
"endsAt": "",
|
|
"canceled": "0",
|
|
"gracePeriod": "0",
|
|
"onHold": "0",
|
|
"canReactivate": "0",
|
|
"canUpgrade": "0",
|
|
"canDowngrade": "0",
|
|
"canConvert": "0",
|
|
"type": "plexpass",
|
|
"braintreeId": "xyzxyz",
|
|
"state": "active"
|
|
})
|
|
billing = ET.SubElement(sub, "billing")
|
|
ET.SubElement(billing, "paymentMethod", {"id": "1234567"})
|
|
ET.SubElement(billing, "internalPaymentMethod")
|
|
|
|
# --- Replace <entitlements> ---
|
|
for ents in data.findall("entitlements"):
|
|
data.remove(ents)
|
|
entitlements = ET.SubElement(data, "entitlements")
|
|
for entitlement in ENTITLEMENTS:
|
|
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
|
|
|
|
# --- Replace <roles> ---
|
|
for roles in data.findall("roles"):
|
|
data.remove(roles)
|
|
roles = ET.SubElement(data, "roles")
|
|
ET.SubElement(roles, "role", {"id": "plexpass"})
|
|
|
|
# Pretty print XML
|
|
ET.indent(data)
|
|
|
|
data_override = ET.tostring(data, encoding="utf-8", method="xml")
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.post("/api/v2/users/signin", tags=[Hosts.CLIENTS.value, Hosts.PLEX.value], name="Fake signin")
|
|
@app.get("/api/v2/user.json", tags=[Hosts.PLEX.value], name="Fake user.json", description="Used by Plexamp")
|
|
async def fake_user_json_or_signin(request: Request, _=host_required([Hosts.CLIENTS, Hosts.PLEX])):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/json"):
|
|
data_override = {
|
|
"subscriptionDescription": "Monthly Plex Pass",
|
|
"roles": ["plexpass"],
|
|
"entitlements": ENTITLEMENTS,
|
|
"subscription": {
|
|
"active": True,
|
|
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
|
|
"status": "Active",
|
|
"paymentService": "braintree",
|
|
"plan": "monthly",
|
|
"paymentNotificationId": "1234567",
|
|
"canUpgrade": True,
|
|
"features": PLEXAMP_FEATURES if "plexamp" in request.headers.get("X-Plex-Product",
|
|
"").lower() else FEATURES_DICT
|
|
},
|
|
"subscriptions": [
|
|
{
|
|
"id": 1234567,
|
|
"mode": "monthly",
|
|
"startsAt": TIMESTAMP_CURRENT_MINUS_30MIN,
|
|
"renewsAt": TIMESTAMP_CURRENT_PLUS_30DAYS,
|
|
"endsAt": None,
|
|
"billing": {
|
|
"paymentMethodId": 1234567,
|
|
"internalPaymentMethod": {}
|
|
},
|
|
"canceled": False,
|
|
"gracePeriod": False,
|
|
"onHold": False,
|
|
"canReactivate": False,
|
|
"canUpgrade": False,
|
|
"canDowngrade": False,
|
|
"canConvert": False,
|
|
"type": "plexpass",
|
|
"braintreeId": "xyzxyz",
|
|
"state": "active"
|
|
}
|
|
]
|
|
}
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.post("/api/claim/exchange", tags=[Hosts.PLEX.value])
|
|
async def fake_claim_exchange(request: Request, _=host_required([Hosts.PLEX])):
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
data_override = None
|
|
|
|
content_type = upstream_response.headers.get("content-type", "")
|
|
if content_type.startswith("application/xml"):
|
|
data = ET.fromstring(upstream_response.content)
|
|
|
|
subscription = data.find("subscription")
|
|
if subscription is not None:
|
|
parent = data
|
|
parent.remove(subscription)
|
|
parent.append(PLEXAMP_SUBSCRIPTION_XML)
|
|
|
|
# --- Replace <entitlements> ---
|
|
for ents in data.findall("entitlements"):
|
|
data.remove(ents)
|
|
entitlements = ET.SubElement(data, "entitlements", {"all": "1"})
|
|
for entitlement in ENTITLEMENTS:
|
|
ET.SubElement(entitlements, "entitlement", {"id": entitlement})
|
|
|
|
# --- Replace <roles> ---
|
|
for roles in data.findall("roles"):
|
|
data.remove(roles)
|
|
roles = ET.SubElement(data, "roles")
|
|
ET.SubElement(roles, "role", {"id": "plexpass"})
|
|
|
|
# Pretty print XML
|
|
ET.indent(data)
|
|
|
|
data_override = ET.tostring(data, encoding="utf-8", method="xml")
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.post("/api/v2/user/webhooks", tags=[Hosts.CLIENTS.value])
|
|
async def fake_get_user_webhooks(request: Request, _=host_required([Hosts.CLIENTS])):
|
|
# FIXME: this doesnt really work because webhooks are sent from Plex themselves so unless we implement a complete
|
|
# webhook emitter this is kinda useless but at least removes errors from UI...
|
|
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
body = await request.body()
|
|
raw_text = body.decode("utf-8")
|
|
parsed_urls = parse_qs(raw_text)
|
|
data_override = [
|
|
{"url": url} for url in parsed_urls.get("urls[]", [])
|
|
]
|
|
|
|
return await return_edited_response(upstream_response, data_override)
|
|
|
|
|
|
@app.get("/api/v2/user/webhooks", tags=[Hosts.CLIENTS.value])
|
|
async def fake_list_user_webhooks(request: Request, _=host_required([Hosts.CLIENTS])):
|
|
# See note in POST /api/v2/user/webhooks
|
|
|
|
upstream_response = await call_official(request, request.url.path.lstrip("/"))
|
|
|
|
upstream_response.status_code = 200
|
|
data_override = [{"url": "https://webhooks.are.not.supported.because.plex.sends.them/not/your/server"}]
|
|
|
|
return await return_edited_response(upstream_response, data_override, ignore_official_data=True)
|
|
|
|
|
|
@app.post("/v1/initialize", tags=[Hosts.FEATURES.value])
|
|
@app.options("/v1/initialize", tags=[Hosts.FEATURES.value])
|
|
@app.post("/v1/rgstr", tags=[Hosts.FEATURES.value])
|
|
@app.options("/v1/rgstr", tags=[Hosts.FEATURES.value])
|
|
@app.post("/collect/event", tags=[Hosts.ANALYTICS.value])
|
|
@app.options("/collect/event", tags=[Hosts.ANALYTICS.value])
|
|
async def block_telemetry():
|
|
return fake_response({"status": "disabled by proxy :)"}, status_code=200)
|
|
|
|
|
|
@app.get("/favicon.ico")
|
|
async def favicon():
|
|
return FileResponse("favicon.ico")
|
|
|
|
|
|
@app.get("/proxy")
|
|
async def hack():
|
|
return {"status": "ok"}
|
|
|
|
|
|
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
|
|
async def catch_all(request: Request, path: str):
|
|
if request.base_url.hostname in ["127.0.0.1", "localhost", "0.0.0.0"]:
|
|
return {"status": "proxy running ok"}
|
|
|
|
upstream_response = await call_official(request, path)
|
|
|
|
return await return_edited_response(upstream_response)
|