diff --git a/proxy.py b/proxy.py index 11625cc20..07baac656 100644 --- a/proxy.py +++ b/proxy.py @@ -20,7 +20,12 @@ # ## -from mitmproxy import http +import collections +import random +from mitmproxy import http, connection, ctx, tls +from abc import ABC, abstractmethod +from enum import Enum +from mitmproxy.utils import human from proxy_config import USE_SSL from proxy_config import REMOTE_HOST from proxy_config import REMOTE_PORT @@ -71,6 +76,80 @@ class MlgmXyysd_Genshin_Impact_Proxy: flow.request.host = REMOTE_HOST flow.request.port = REMOTE_PORT +class InterceptionResult(Enum): + SUCCESS = 1 + FAILURE = 2 + SKIPPED = 3 + + +class TlsStrategy(ABC): + def __init__(self): + self.history = collections.defaultdict(lambda: collections.deque(maxlen=200)) + + @abstractmethod + def should_intercept(self, server_address: connection.Address) -> bool: + raise NotImplementedError() + + def record_success(self, server_address): + self.history[server_address].append(InterceptionResult.SUCCESS) + + def record_failure(self, server_address): + self.history[server_address].append(InterceptionResult.FAILURE) + + def record_skipped(self, server_address): + self.history[server_address].append(InterceptionResult.SKIPPED) + + +class ConservativeStrategy(TlsStrategy): + def should_intercept(self, server_address: connection.Address) -> bool: + return InterceptionResult.FAILURE not in self.history[server_address] + + +class ProbabilisticStrategy(TlsStrategy): + def __init__(self, p: float): + self.p = p + super().__init__() + + def should_intercept(self, server_address: connection.Address) -> bool: + return random.uniform(0, 1) < self.p + + +class MaybeTls: + strategy: TlsStrategy + + def load(self, l): + l.add_option( + "tls_strategy", int, 0, + "TLS passthrough strategy. If set to 0, connections will be passed through after the first unsuccessful " + "handshake. If set to 0 < p <= 100, connections with be passed through with probability p.", + ) + + def configure(self, updated): + if "tls_strategy" not in updated: + return + if ctx.options.tls_strategy > 0: + self.strategy = ProbabilisticStrategy(ctx.options.tls_strategy / 100) + else: + self.strategy = ConservativeStrategy() + + def tls_clienthello(self, data: tls.ClientHelloData): + server_address = data.context.server.peername + if not self.strategy.should_intercept(server_address): + ctx.log(f"TLS passthrough: {human.format_address(server_address)}.") + data.ignore_connection = True + self.strategy.record_skipped(server_address) + + def tls_established_client(self, data: tls.TlsData): + server_address = data.context.server.peername + ctx.log(f"TLS handshake successful: {human.format_address(server_address)}") + self.strategy.record_success(server_address) + + def tls_failed_client(self, data: tls.TlsData): + server_address = data.context.server.peername + ctx.log(f"TLS handshake failed: {human.format_address(server_address)}") + self.strategy.record_failure(server_address) + addons = [ - MlgmXyysd_Genshin_Impact_Proxy() + MlgmXyysd_Genshin_Impact_Proxy(), + MaybeTls() ]