move ClientHello to mitmproxy.tls

This commit is contained in:
Maximilian Hils 2021-09-04 16:10:39 +02:00
parent 9f39e2f387
commit bdf4e31c58
6 changed files with 112 additions and 107 deletions

View File

@ -1,4 +1,3 @@
import io
import ipaddress
import os
import threading
@ -11,12 +10,9 @@ import certifi
from OpenSSL.crypto import X509
from cryptography.hazmat.primitives.asymmetric import rsa
from kaitaistruct import KaitaiStream
from OpenSSL import SSL, crypto
from mitmproxy import certs
from mitmproxy.contrib.kaitaistruct import tls_client_hello
from mitmproxy.net import check
# redeclared here for strict type checking
@ -279,49 +275,3 @@ def is_tls_record_magic(d):
d[1] == 0x03 and
0x0 <= d[2] <= 0x03
)
class ClientHello:
def __init__(self, raw_client_hello):
self._client_hello = tls_client_hello.TlsClientHello(
KaitaiStream(io.BytesIO(raw_client_hello))
)
@property
def cipher_suites(self) -> List[int]:
return self._client_hello.cipher_suites.cipher_suites
@property
def sni(self) -> Optional[str]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
is_valid_sni_extension = (
extension.type == 0x00 and
len(extension.body.server_names) == 1 and
extension.body.server_names[0].name_type == 0 and
check.is_valid_host(extension.body.server_names[0].host_name)
)
if is_valid_sni_extension:
return extension.body.server_names[0].host_name.decode("ascii")
return None
@property
def alpn_protocols(self) -> List[bytes]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
if extension.type == 0x10:
return list(x.name for x in extension.body.alpn_protocols)
return []
@property
def extensions(self) -> List[Tuple[int, bytes]]:
ret = []
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
body = getattr(extension, "_raw_body", extension.body)
ret.append((extension.type, body))
return ret
def __repr__(self):
return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"

View File

@ -4,8 +4,8 @@ from dataclasses import dataclass
from typing import Iterator, Literal, Optional, Tuple
from OpenSSL import SSL
from mitmproxy.tls import ClientHello
from mitmproxy import certs, connection
from mitmproxy.net import tls as net_tls
from mitmproxy.proxy import commands, events, layer, tunnel
from mitmproxy.proxy import context
from mitmproxy.proxy.commands import StartHook
@ -70,7 +70,7 @@ def get_client_hello(data: bytes) -> Optional[bytes]:
return None
def parse_client_hello(data: bytes) -> Optional[net_tls.ClientHello]:
def parse_client_hello(data: bytes) -> Optional[ClientHello]:
"""
Check if the supplied bytes contain a full ClientHello message,
and if so, parse it.
@ -86,7 +86,7 @@ def parse_client_hello(data: bytes) -> Optional[net_tls.ClientHello]:
client_hello = get_client_hello(data)
if client_hello:
try:
return net_tls.ClientHello(client_hello[4:])
return ClientHello(client_hello[4:])
except EOFError as e:
raise ValueError("Invalid ClientHello") from e
return None
@ -102,7 +102,7 @@ HTTP_ALPNS = (b"h2",) + HTTP1_ALPNS
class ClientHelloData:
context: context.Context
"""The context object for this connection."""
client_hello: net_tls.ClientHello
client_hello: ClientHello
"""The entire parsed TLS ClientHello."""
ignore_connection: bool = False
"""

53
mitmproxy/tls.py Normal file
View File

@ -0,0 +1,53 @@
import io
from typing import List, Optional, Tuple
from kaitaistruct import KaitaiStream
from mitmproxy.contrib.kaitaistruct import tls_client_hello
from mitmproxy.net import check
class ClientHello:
def __init__(self, raw_client_hello):
self._client_hello = tls_client_hello.TlsClientHello(
KaitaiStream(io.BytesIO(raw_client_hello))
)
@property
def cipher_suites(self) -> List[int]:
return self._client_hello.cipher_suites.cipher_suites
@property
def sni(self) -> Optional[str]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
is_valid_sni_extension = (
extension.type == 0x00 and
len(extension.body.server_names) == 1 and
extension.body.server_names[0].name_type == 0 and
check.is_valid_host(extension.body.server_names[0].host_name)
)
if is_valid_sni_extension:
return extension.body.server_names[0].host_name.decode("ascii")
return None
@property
def alpn_protocols(self) -> List[bytes]:
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
if extension.type == 0x10:
return list(x.name for x in extension.body.alpn_protocols)
return []
@property
def extensions(self) -> List[Tuple[int, bytes]]:
ret = []
if self._client_hello.extensions:
for extension in self._client_hello.extensions.extensions:
body = getattr(extension, "_raw_body", extension.body)
ret.append((extension.type, body))
return ret
def __repr__(self):
return f"ClientHello(sni: {self.sni}, alpn_protocols: {self.alpn_protocols})"

View File

@ -4,17 +4,6 @@ from OpenSSL import SSL
from mitmproxy import certs
from mitmproxy.net import tls
CLIENT_HELLO_NO_EXTENSIONS = bytes.fromhex(
"03015658a756ab2c2bff55f636814deac086b7ca56b65058c7893ffc6074f5245f70205658a75475103a152637"
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
"61006200640100"
)
FULL_CLIENT_HELLO_NO_EXTENSIONS = (
b"\x16\x03\x03\x00\x65" # record layer
b"\x01\x00\x00\x61" + # handshake header
CLIENT_HELLO_NO_EXTENSIONS
)
def test_make_master_secret_logger():
assert tls.make_master_secret_logger(None) is None
@ -84,43 +73,3 @@ def test_is_record_magic():
assert tls.is_tls_record_magic(b"\x16\x03\x01")
assert tls.is_tls_record_magic(b"\x16\x03\x02")
assert tls.is_tls_record_magic(b"\x16\x03\x03")
class TestClientHello:
def test_no_extensions(self):
c = tls.ClientHello(CLIENT_HELLO_NO_EXTENSIONS)
assert repr(c)
assert c.sni is None
assert c.cipher_suites == [53, 47, 10, 5, 4, 9, 3, 6, 8, 96, 97, 98, 100]
assert c.alpn_protocols == []
assert c.extensions == []
def test_extensions(self):
data = bytes.fromhex(
"03033b70638d2523e1cba15f8364868295305e9c52aceabda4b5147210abc783e6e1000022c02bc02fc02cc030"
"cca9cca8cc14cc13c009c013c00ac014009c009d002f0035000a0100006cff0100010000000010000e00000b65"
"78616d706c652e636f6d0017000000230000000d00120010060106030501050304010403020102030005000501"
"00000000001200000010000e000c02683208687474702f312e3175500000000b00020100000a00080006001d00"
"170018"
)
c = tls.ClientHello(data)
assert repr(c)
assert c.sni == 'example.com'
assert c.cipher_suites == [
49195, 49199, 49196, 49200, 52393, 52392, 52244, 52243, 49161,
49171, 49162, 49172, 156, 157, 47, 53, 10
]
assert c.alpn_protocols == [b'h2', b'http/1.1']
assert c.extensions == [
(65281, b'\x00'),
(0, b'\x00\x0e\x00\x00\x0bexample.com'),
(23, b''),
(35, b''),
(13, b'\x00\x10\x06\x01\x06\x03\x05\x01\x05\x03\x04\x01\x04\x03\x02\x01\x02\x03'),
(5, b'\x01\x00\x00\x00\x00'),
(18, b''),
(16, b'\x00\x0c\x02h2\x08http/1.1'),
(30032, b''),
(11, b'\x01\x00'),
(10, b'\x00\x06\x00\x1d\x00\x17\x00\x18')
]

View File

@ -1,7 +1,7 @@
from hypothesis import given, example
from hypothesis.strategies import binary, integers
from mitmproxy.net.tls import ClientHello
from mitmproxy.tls import ClientHello
from mitmproxy.proxy.layers.tls import parse_client_hello
client_hello_with_extensions = bytes.fromhex(
@ -17,7 +17,7 @@ client_hello_with_extensions = bytes.fromhex(
@given(i=integers(0, len(client_hello_with_extensions)), data=binary())
@example(i=183, data=b'\x00\x00\x00\x00\x00\x00\x00\x00\x00')
def test_fuzz_h2_request_chunks(i, data):
def test_fuzz_parse_client_hello(i, data):
try:
ch = parse_client_hello(client_hello_with_extensions[:i] + data)
except ValueError:

View File

@ -0,0 +1,53 @@
from mitmproxy import tls
CLIENT_HELLO_NO_EXTENSIONS = bytes.fromhex(
"03015658a756ab2c2bff55f636814deac086b7ca56b65058c7893ffc6074f5245f70205658a75475103a152637"
"78e1bb6d22e8bbd5b6b0a3a59760ad354e91ba20d353001a0035002f000a000500040009000300060008006000"
"61006200640100"
)
FULL_CLIENT_HELLO_NO_EXTENSIONS = (
b"\x16\x03\x03\x00\x65" # record layer
b"\x01\x00\x00\x61" + # handshake header
CLIENT_HELLO_NO_EXTENSIONS
)
class TestClientHello:
def test_no_extensions(self):
c = tls.ClientHello(CLIENT_HELLO_NO_EXTENSIONS)
assert repr(c)
assert c.sni is None
assert c.cipher_suites == [53, 47, 10, 5, 4, 9, 3, 6, 8, 96, 97, 98, 100]
assert c.alpn_protocols == []
assert c.extensions == []
def test_extensions(self):
data = bytes.fromhex(
"03033b70638d2523e1cba15f8364868295305e9c52aceabda4b5147210abc783e6e1000022c02bc02fc02cc030"
"cca9cca8cc14cc13c009c013c00ac014009c009d002f0035000a0100006cff0100010000000010000e00000b65"
"78616d706c652e636f6d0017000000230000000d00120010060106030501050304010403020102030005000501"
"00000000001200000010000e000c02683208687474702f312e3175500000000b00020100000a00080006001d00"
"170018"
)
c = tls.ClientHello(data)
assert repr(c)
assert c.sni == 'example.com'
assert c.cipher_suites == [
49195, 49199, 49196, 49200, 52393, 52392, 52244, 52243, 49161,
49171, 49162, 49172, 156, 157, 47, 53, 10
]
assert c.alpn_protocols == [b'h2', b'http/1.1']
assert c.extensions == [
(65281, b'\x00'),
(0, b'\x00\x0e\x00\x00\x0bexample.com'),
(23, b''),
(35, b''),
(13, b'\x00\x10\x06\x01\x06\x03\x05\x01\x05\x03\x04\x01\x04\x03\x02\x01\x02\x03'),
(5, b'\x01\x00\x00\x00\x00'),
(18, b''),
(16, b'\x00\x0c\x02h2\x08http/1.1'),
(30032, b''),
(11, b'\x01\x00'),
(10, b'\x00\x06\x00\x1d\x00\x17\x00\x18')
]