105 lines
4.2 KiB
Python
105 lines
4.2 KiB
Python
import statistics
|
|
from typing import List, Tuple
|
|
|
|
from config import SYNC_CONFIG
|
|
from .types import AnalysisPoint
|
|
|
|
|
|
class Analyzer:
|
|
@staticmethod
|
|
def filter_outliers(points: List[AnalysisPoint]) -> List[AnalysisPoint]:
|
|
"""IQR Filter to remove bad matches."""
|
|
if len(points) < 4: return points
|
|
offsets = sorted([p.offset_ms for p in points])
|
|
q1 = offsets[len(offsets) // 4]
|
|
q3 = offsets[3 * len(offsets) // 4]
|
|
iqr = q3 - q1
|
|
lower, upper = q1 - (1.5 * iqr), q3 + (1.5 * iqr)
|
|
return [p for p in points if lower <= p.offset_ms <= upper]
|
|
|
|
@staticmethod
|
|
def calculate_weighted_regression(points: List[AnalysisPoint]) -> Tuple[float, float, float]:
|
|
"""Returns (Slope, Intercept, R2) weighted by match confidence."""
|
|
n = len(points)
|
|
if n < 2: return 1.0, 0.0, 0.0
|
|
|
|
x = [p.timestamp_ms for p in points]
|
|
y = [p.timestamp_ms + p.offset_ms for p in points]
|
|
w = [p.match_count for p in points]
|
|
|
|
sum_w = sum(w)
|
|
sum_wx = sum(wi * xi for wi, xi in zip(w, x))
|
|
sum_wy = sum(wi * yi for wi, yi in zip(w, y))
|
|
sum_wxx = sum(wi * xi * xi for wi, xi in zip(w, x))
|
|
sum_wxy = sum(wi * xi * yi for wi, xi, yi in zip(w, x, y))
|
|
|
|
denom = sum_w * sum_wxx - sum_wx * sum_wx
|
|
if denom == 0: return 1.0, 0.0, 0.0
|
|
|
|
slope = (sum_w * sum_wxy - sum_wx * sum_wy) / denom
|
|
intercept = (sum_wy - slope * sum_wx) / sum_w
|
|
|
|
# Unweighted R2
|
|
y_mean = sum(y) / n
|
|
ss_tot = sum((yi - y_mean) ** 2 for yi in y)
|
|
ss_res = sum((yi - (slope * xi + intercept)) ** 2 for xi, yi in zip(x, y))
|
|
r2 = 1 - (ss_res / ss_tot) if ss_tot != 0 else 0
|
|
|
|
return slope, intercept, r2
|
|
|
|
@staticmethod
|
|
def smooth_points(points: List[AnalysisPoint]) -> List[AnalysisPoint]:
|
|
"""Weighted smoothing for Elastic mode."""
|
|
if len(points) < 3: return points
|
|
points.sort(key=lambda p: p.timestamp_ms)
|
|
smoothed = [points[0]]
|
|
for i in range(1, len(points) - 1):
|
|
prev, curr, next_p = points[i - 1], points[i], points[i + 1]
|
|
avg_offset = (prev.offset_ms * 0.25) + (curr.offset_ms * 0.5) + (next_p.offset_ms * 0.25)
|
|
smoothed.append(AnalysisPoint(curr.timestamp_ms, avg_offset, curr.match_count))
|
|
smoothed.append(points[-1])
|
|
return smoothed
|
|
|
|
@staticmethod
|
|
def get_interpolated_offset(target_ms: int, anchors: List[AnalysisPoint]) -> float:
|
|
if target_ms <= anchors[0].timestamp_ms: return anchors[0].offset_ms
|
|
if target_ms >= anchors[-1].timestamp_ms: return anchors[-1].offset_ms
|
|
|
|
for i in range(len(anchors) - 1):
|
|
p1, p2 = anchors[i], anchors[i + 1]
|
|
if p1.timestamp_ms <= target_ms < p2.timestamp_ms:
|
|
alpha = (target_ms - p1.timestamp_ms) / (p2.timestamp_ms - p1.timestamp_ms)
|
|
return p1.offset_ms + (alpha * (p2.offset_ms - p1.offset_ms))
|
|
return anchors[0].offset_ms
|
|
|
|
@staticmethod
|
|
def decide_sync_strategy(points: List[AnalysisPoint]) -> str:
|
|
clean_points = Analyzer.filter_outliers(points)
|
|
if len(clean_points) < 2: return 'CONSTANT'
|
|
|
|
offsets = [p.offset_ms for p in clean_points]
|
|
std_dev = statistics.stdev(offsets) if len(offsets) > 1 else 0
|
|
|
|
print(f"\nAnalysis Metrics (Cleaned Data):")
|
|
print(f" Spread: {max(offsets) - min(offsets)}ms")
|
|
print(f" StdDev: {std_dev:.2f}ms")
|
|
|
|
if std_dev < SYNC_CONFIG['jitter_tolerance_ms']:
|
|
print(" Decision: Offsets are stable (Low Jitter).")
|
|
return 'CONSTANT'
|
|
|
|
if not SYNC_CONFIG['fix_drift']:
|
|
print(" Decision: Drift detected but 'fix_drift' is False.")
|
|
return 'CONSTANT'
|
|
|
|
slope, _, r2 = Analyzer.calculate_weighted_regression(clean_points)
|
|
drift_per_hour = abs(slope - 1.0) * 3600000
|
|
print(f" Linear Fit: R2={r2:.4f}, Slope={slope:.6f} (Drift: {drift_per_hour:.0f}ms/hr)")
|
|
|
|
if r2 >= SYNC_CONFIG['linear_r2_threshold'] and drift_per_hour > 100:
|
|
print(" Decision: Linear drift detected.")
|
|
return 'LINEAR'
|
|
|
|
print(" Decision: Variable/irregular drift.")
|
|
return 'ELASTIC'
|