refactored

This commit is contained in:
Mathieu Broillet 2025-08-25 18:24:54 +02:00
parent 74109c8cbf
commit bf4cd556d1
Signed by: mathieub
GPG Key ID: 4428608CDA3A98D3
6 changed files with 154 additions and 172 deletions

View File

@ -51,8 +51,9 @@ subjectAltName = @alt_names
[alt_names]
DNS.1 = plex.tv
DNS.2 = clients.plex.tv
DNS.3 = app.plex.tv
DNS.2 = *.plex.tv
DNS.3 = *.provider.plex.tv
EOL
# Create a certificate signing request (CSR) using the SAN config

View File

@ -1,7 +1,35 @@
from datetime import datetime, timezone, timedelta
import xml.etree.ElementTree as ET
from datetime import datetime, timezone, timedelta
from enum import Enum
OFFICIAL_API = "https://clients.plex.tv"
class Hosts(str, Enum):
PLEX = "plex.tv"
CLIENTS = "clients.plex.tv"
APP = "app.plex.tv"
COMMUNITY = "community.plex.tv"
TOGETHER = "together.plex.tv"
FEATURES = "features.plex.tv"
EPGPROVIDER = "epg.provider.plex.tv"
DISCOVERPROVIDER = "discover.provider.plex.tv"
VODPROVIDER = "vod.provider.plex.tv"
METADATAPROVIDER = "metadata.provider.plex.tv"
ANALYTICS = "analytics.plex.tv"
# Hop-by-hop headers we must strip
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
"content-length", # let httpx calculate
"host", # must be rewritten
}
FEATURES_DICT = [{"id": "guided-upgrade", "uuid": "c9d9b7ee-fdd9-474e-b143-5039c04e9b9b"},
{"id": "increase-password-complexity", "uuid": "9e93f8a8-7ccd-4d15-99fa-76a158027660"},

BIN
favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

View File

@ -1,118 +1,18 @@
import xml.etree.ElementTree as ET
import httpx
from fastapi import FastAPI, Request
from fastapi.responses import Response
from starlette.responses import JSONResponse
from starlette.responses import FileResponse
from client import AsyncCustomHost, NameSolver
from const import FEATURES_DICT, ENTITLEMENTS, CURRENT_DATE_MINUS_ONE_DAY, FEATURES, \
CURRENT_DATE_MINUS_ONE_DAY_Z, NEXT_MONTH_DATE_Z, PLEXAMP_FEATURES, TIMESTAMP_CURRENT_PLUS_30DAYS, \
TIMESTAMP_CURRENT_MINUS_30MIN, PLEXAMP_SUBSCRIPTION_XML
TIMESTAMP_CURRENT_MINUS_30MIN, PLEXAMP_SUBSCRIPTION_XML, Hosts
from utils import call_official, return_edited_response, host_required
app = FastAPI()
# Configure upstream client
official_client = httpx.AsyncClient(
verify=True,
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=50, max_connections=100),
transport=AsyncCustomHost(NameSolver())
)
# Hop-by-hop headers we must strip
HOP_BY_HOP_HEADERS = {
"connection",
"keep-alive",
"proxy-authenticate",
"proxy-authorization",
"te",
"trailer",
"transfer-encoding",
"upgrade",
"content-length", # let httpx calculate
"host", # must be rewritten
}
async def call_official(request: Request, path: str) -> httpx.Response:
"""Forward the incoming request to the official API and return the response."""
print(f"Processing request to {str(request.url)}")
# Copy request body
body = await request.body()
# Copy headers, filtering hop-by-hop
req_headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
# Rewrite Host header to match upstream
req_headers["host"] = request.url.hostname
# Forward request upstream
upstream_response = await official_client.request(
method=request.method,
url=f"{str(request.base_url).rstrip('/').replace("http://", "https://")}/{path}",
headers=req_headers,
content=body,
params=request.query_params,
cookies=request.cookies,
)
# Filter response headers hop-by-hop
resp_headers = {
k: v
for k, v in upstream_response.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
upstream_response.headers = resp_headers
return upstream_response
async def return_edited_response(
response: httpx.Response,
new_content: dict | bytes | None = None
) -> Response:
"""
Return possibly modified response.
If new_content is a dict and the response is JSON, it will merge/override keys.
If new_content and response data are lists, new_content will replace the entire response data.
If new_content is bytes or str, it will replace the response body entirely.
"""
content_type = response.headers.get("content-type", "")
response.headers.pop("content-encoding", None) # body is already decoded
if content_type.startswith("application/json"):
data = response.json()
if isinstance(new_content, dict): # allow overrides
data = {**data, **new_content}
if isinstance(data, list) and isinstance(new_content, list):
data = new_content # replace entire list
return JSONResponse(
content=data,
status_code=response.status_code,
headers=response.headers
)
else:
content = new_content if isinstance(new_content, (bytes, str)) else response.content
return Response(
content=content,
status_code=response.status_code,
headers=response.headers,
media_type=content_type
)
@app.get("/api/v2/home")
async def fake_get_home(request: Request):
@app.get("/api/v2/home", tags=[Hosts.CLIENTS.value])
async def fake_get_home(request: Request, _=host_required([Hosts.CLIENTS])):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = {
@ -122,8 +22,8 @@ async def fake_get_home(request: Request):
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/features")
async def fake_get_features(request: Request):
@app.get("/api/v2/features", tags=[Hosts.CLIENTS.value])
async def fake_get_features(request: Request, _=host_required([Hosts.CLIENTS])):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = FEATURES_DICT
@ -131,8 +31,8 @@ async def fake_get_features(request: Request):
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/user")
async def fake_get_user(request: Request):
@app.get("/api/v2/user", tags=[Hosts.CLIENTS.value])
async def fake_get_user(request: Request, _=host_required([Hosts.CLIENTS])):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
@ -164,7 +64,7 @@ async def fake_get_user(request: Request):
subscription.set("status", "Active")
subscription.set("paymentService", "braintree")
subscription.set("plan", "monthly")
subscription.set("paymentNotificationId", "3421431")
subscription.set("paymentNotificationId", "1234567")
features = subscription.find("features")
if features is not None:
@ -217,8 +117,9 @@ async def fake_get_user(request: Request):
return await return_edited_response(upstream_response, data_override)
@app.post("/api/v2/users/signin")
async def fake_user_signin(request: Request):
@app.post("/api/v2/users/signin", tags=[Hosts.CLIENTS.value, Hosts.PLEX.value], name="Fake signin")
@app.get("/api/v2/user.json", tags=[Hosts.PLEX.value], name="Fake user.json", description="Used by Plexamp")
async def fake_user_json_or_signin(request: Request, _=host_required([Hosts.CLIENTS, Hosts.PLEX])):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
@ -268,59 +169,8 @@ async def fake_user_signin(request: Request):
return await return_edited_response(upstream_response, data_override)
@app.get("/api/v2/user.json")
async def fake_get_user_json(request: Request):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
content_type = upstream_response.headers.get("content-type", "")
if content_type.startswith("application/json"):
data_override = {
"subscriptionDescription": "Monthly Plex Pass",
"roles": ["plexpass"],
"entitlements": ENTITLEMENTS,
"subscription": {
"active": True,
"subscribedAt": CURRENT_DATE_MINUS_ONE_DAY_Z,
"status": "Active",
"paymentService": "braintree",
"plan": "monthly",
"paymentNotificationId": "1234567",
"canUpgrade": True,
"features": PLEXAMP_FEATURES if "plexamp" in request.headers.get(
"X-Plex-Product").lower() else FEATURES_DICT
},
"subscriptions": [
{
"id": 1234567,
"mode": "monthly",
"startsAt": TIMESTAMP_CURRENT_MINUS_30MIN,
"renewsAt": TIMESTAMP_CURRENT_PLUS_30DAYS,
"endsAt": None,
"billing": {
"paymentMethodId": 1234567,
"internalPaymentMethod": {}
},
"canceled": False,
"gracePeriod": False,
"onHold": False,
"canReactivate": False,
"canUpgrade": False,
"canDowngrade": False,
"canConvert": False,
"type": "plexpass",
"braintreeId": "xyzxyz",
"state": "active"
}
]
}
return await return_edited_response(upstream_response, data_override)
@app.post("/api/claim/exchange")
async def fake_claim_exchange(request: Request):
@app.post("/api/claim/exchange", tags=[Hosts.PLEX.value])
async def fake_claim_exchange(request: Request, _=host_required([Hosts.PLEX])):
upstream_response = await call_official(request, request.url.path.lstrip("/"))
data_override = None
@ -356,13 +206,16 @@ async def fake_claim_exchange(request: Request):
return await return_edited_response(upstream_response, data_override)
@app.get("/api/hack")
async def hack_endpoint():
return {"status": "OK, Plex Pass features proxy enabled"}
@app.get("/favicon.ico")
async def favicon():
return FileResponse("favicon.ico")
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
async def catch_all(request: Request, path: str):
if path == "":
return {"status": "PROXY ONLINE"}
upstream_response = await call_official(request, path)
return await return_edited_response(upstream_response)

100
utils.py Normal file
View File

@ -0,0 +1,100 @@
import httpx
from fastapi import Request, Depends, HTTPException
from fastapi.responses import Response
from starlette.responses import JSONResponse
from const import Hosts, HOP_BY_HOP_HEADERS
from http_client_static_dns import AsyncCustomHost, NameSolver
# Configure upstream client
official_client = httpx.AsyncClient(
verify=True,
timeout=httpx.Timeout(30.0, connect=10.0),
limits=httpx.Limits(max_keepalive_connections=50, max_connections=100),
transport=AsyncCustomHost(NameSolver())
)
def host_required(allowed_hosts: list[Hosts]):
async def dependency(request: Request):
host = request.url.hostname
if host not in [h.value for h in allowed_hosts]:
raise HTTPException(status_code=403, detail=f"Host '{host}' not allowed")
return Depends(dependency)
async def call_official(request: Request, path: str) -> httpx.Response:
"""Forward the incoming request to the official API and return the response."""
print(f"Processing request to {str(request.url)}")
# Copy request body
body = await request.body()
# Copy headers, filtering hop-by-hop
req_headers = {
k: v
for k, v in request.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
# Rewrite Host header to match upstream
req_headers["host"] = request.url.hostname
# Forward request upstream
upstream_response = await official_client.request(
method=request.method,
url=f"{str(request.base_url).rstrip('/').replace("http://", "https://")}/{path}",
headers=req_headers,
content=body,
params=request.query_params,
cookies=request.cookies,
)
# Filter response headers hop-by-hop
resp_headers = {
k: v
for k, v in upstream_response.headers.items()
if k.lower() not in HOP_BY_HOP_HEADERS
}
upstream_response.headers = resp_headers
return upstream_response
async def return_edited_response(
response: httpx.Response,
new_content: dict | bytes | None = None
) -> Response:
"""
Return possibly modified response.
If new_content is a dict and the response is JSON, it will merge/override keys.
If new_content and response data are lists, new_content will replace the entire response data.
If new_content is bytes or str, it will replace the response body entirely.
"""
content_type = response.headers.get("content-type", "")
response.headers.pop("content-encoding", None) # body is already decoded
if content_type.startswith("application/json"):
data = response.json()
if isinstance(new_content, dict): # allow overrides
data = {**data, **new_content}
if isinstance(data, list) and isinstance(new_content, list):
data = new_content # replace entire list
return JSONResponse(
content=data,
status_code=response.status_code,
headers=response.headers
)
else:
content = new_content if isinstance(new_content, (bytes, str)) else response.content
return Response(
content=content,
status_code=response.status_code,
headers=response.headers,
media_type=content_type
)