sans-io adjustments

This commit is contained in:
Maximilian Hils 2020-12-11 13:47:47 +01:00
parent 39b0b6c110
commit 01f57346ee
9 changed files with 73 additions and 36 deletions

View File

@ -2,6 +2,7 @@ import os
import time
import typing
import uuid
import warnings
from mitmproxy import certs
from mitmproxy import exceptions
@ -19,7 +20,6 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
Attributes:
address: Remote address
tls_established: True if TLS is established, False otherwise
clientcert: The TLS client certificate
mitmcert: The MITM'ed TLS server certificate presented to the client
timestamp_start: Connection start timestamp
timestamp_tls_setup: TLS established timestamp
@ -42,7 +42,6 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
self.wfile = None
self.rfile = None
self.address = None
self.clientcert = None
self.tls_established = None
self.id = str(uuid.uuid4())
@ -91,7 +90,7 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
sockname = ("", 0)
error = None
tls = None
certificate_list = None
certificate_list = ()
alpn_offers = None
cipher_list = None
@ -99,7 +98,6 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
id=str,
address=tuple,
tls_established=bool,
clientcert=certs.Cert,
mitmcert=certs.Cert,
timestamp_start=float,
timestamp_tls_setup=float,
@ -119,6 +117,22 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
cipher_list=typing.List[str],
)
@property
def clientcert(self) -> typing.Optional[certs.Cert]: # pragma: no cover
warnings.warn(".clientcert is deprecated, use .certificate_list instead.", PendingDeprecationWarning)
if self.certificate_list:
return self.certificate_list[0]
else:
return None
@clientcert.setter
def clientcert(self, val): # pragma: no cover
warnings.warn(".clientcert is deprecated, use .certificate_list instead.", PendingDeprecationWarning)
if val:
self.certificate_list = [val]
else:
self.certificate_list = []
def send(self, message):
if isinstance(message, list):
message = b''.join(message)
@ -136,7 +150,6 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
return cls.from_state(dict(
id=str(uuid.uuid4()),
address=address,
clientcert=None,
mitmcert=None,
tls_established=False,
timestamp_start=None,
@ -192,7 +205,6 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
ip_address: Resolved remote IP address.
source_address: Local IP address or client's source IP address.
tls_established: True if TLS is established, False otherwise
cert: The certificate presented by the remote during the TLS handshake
sni: Server Name Indication sent by the proxy during the TLS handshake
alpn_proto_negotiated: The negotiated application protocol
tls_version: TLS version
@ -249,7 +261,7 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
state = 0
error = None
tls = None
certificate_list = None
certificate_list = ()
alpn_offers = None
cipher_name = None
cipher_list = None
@ -261,7 +273,6 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
ip_address=tuple,
source_address=tuple,
tls_established=bool,
cert=certs.Cert,
sni=str,
alpn_proto_negotiated=bytes,
tls_version=str,
@ -280,6 +291,22 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
via2=None,
)
@property
def cert(self) -> typing.Optional[certs.Cert]: # pragma: no cover
warnings.warn(".cert is deprecated, use .certificate_list instead.", PendingDeprecationWarning)
if self.certificate_list:
return self.certificate_list[0]
else:
return None
@cert.setter
def cert(self, val): # pragma: no cover
warnings.warn(".cert is deprecated, use .certificate_list instead.", PendingDeprecationWarning)
if val:
self.certificate_list = [val]
else:
self.certificate_list = []
@classmethod
def from_state(cls, state):
f = cls(tuple())
@ -292,7 +319,6 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
id=str(uuid.uuid4()),
address=address,
ip_address=address,
cert=None,
sni=address[0],
alpn_proto_negotiated=None,
tls_version=None,

View File

@ -211,13 +211,13 @@ def convert_9_10(data):
def conv_cconn(conn):
conn["sockname"] = ("", 0)
cc = conn["clientcert"]
conn["certificate_list"] = [cc] if cc else None
cc = conn.pop("clientcert", None)
conn["certificate_list"] = [cc] if cc else []
conv_conn(conn)
def conv_sconn(conn):
crt = conn["cert"]
conn["certificate_list"] = [crt] if crt else None
crt = conn.pop("cert", None)
conn["certificate_list"] = [crt] if crt else []
conn["cipher_name"] = None
conn["via2"] = None
conv_conn(conn)

View File

@ -6,9 +6,9 @@ from mitmproxy import tcp
from mitmproxy import websocket
from mitmproxy import controller
from mitmproxy import http
from mitmproxy import connections
from mitmproxy import flow
from mitmproxy.net import http as net_http
from mitmproxy.utils import compat
from wsproto.frame_protocol import Opcode
@ -147,16 +147,12 @@ def tdummyflow(client_conn=True, server_conn=True, err=None):
return f
def tclient_conn():
"""
@return: mitmproxy.proxy.connection.ClientConnection
"""
c = connections.ClientConnection.from_state(dict(
def tclient_conn() -> compat.Client:
c = compat.Client.from_state(dict(
id=str(uuid.uuid4()),
address=("127.0.0.1", 22),
clientcert=None,
mitmcert=None,
tls_established=False,
tls_established=True,
timestamp_start=946681200,
timestamp_tls_setup=946681201,
timestamp_end=946681206,
@ -179,21 +175,17 @@ def tclient_conn():
return c
def tserver_conn():
"""
@return: mitmproxy.proxy.connection.ServerConnection
"""
c = connections.ServerConnection.from_state(dict(
def tserver_conn() -> compat.Server:
c = compat.Server.from_state(dict(
id=str(uuid.uuid4()),
address=("address", 22),
source_address=("address", 22),
ip_address=("192.168.0.1", 22),
cert=None,
timestamp_start=946681202,
timestamp_tcp_setup=946681203,
timestamp_tls_setup=946681204,
timestamp_end=946681205,
tls_established=False,
tls_established=True,
sni="address",
alpn_proto_negotiated=None,
tls_version="TLSv1.2",

View File

@ -1,7 +1,7 @@
import os
import socket
from mitmproxy.utils import data
from mitmproxy.utils import data, compat
import pytest
@ -22,6 +22,11 @@ skip_appveyor = pytest.mark.skipif(
reason='Skipping due to Appveyor'
)
skip_new_proxy_core = pytest.mark.skipif(
compat.new_proxy_core,
reason='Skipping legacy test for old proxy core'
)
try:
s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
s.bind(("::1", 0))

View File

@ -10,6 +10,7 @@ from mitmproxy.addons import clientplayback
from mitmproxy.test import taddons
from .. import tservers
from ...conftest import skip_new_proxy_core
def tdump(path, flows):
@ -48,6 +49,7 @@ class TBase(tservers.HTTPProxyTest):
def addons(self):
return [clientplayback.ClientPlayback()]
@skip_new_proxy_core
def test_replay(self):
cr = self.master.addons.get("clientplayback")
@ -164,6 +166,7 @@ class TestClientPlayback:
assert cp.count() == 0
await ctx.master.await_log("live")
@skip_new_proxy_core
def test_http2(self):
cp = clientplayback.ClientPlayback()
with taddons.context(cp):

View File

@ -39,14 +39,14 @@ def test_extract(tdata):
["client_conn.address.host", "127.0.0.1"],
["client_conn.tls_version", "TLSv1.2"],
["client_conn.sni", "address"],
["client_conn.tls_established", "false"],
["client_conn.tls_established", "true"],
["server_conn.address.port", "22"],
["server_conn.address.host", "address"],
["server_conn.ip_address.host", "192.168.0.1"],
["server_conn.tls_version", "TLSv1.2"],
["server_conn.sni", "address"],
["server_conn.tls_established", "false"],
["server_conn.tls_established", "true"],
]
for spec, expected in tests:
ret = cut.extract(spec, tf)

View File

@ -12,10 +12,12 @@ from mitmproxy.net.http import http1
from mitmproxy.test import tflow
from .net import tservers
from pathod import test
from ..conftest import skip_new_proxy_core
class TestClientConnection:
@skip_new_proxy_core
def test_send(self):
c = tflow.tclient_conn()
c.send(b'foobar')
@ -26,20 +28,22 @@ class TestClientConnection:
c.send(['string', 'not'])
assert c.wfile.getvalue() == b'foobarfoobar'
@skip_new_proxy_core
def test_repr(self):
c = tflow.tclient_conn()
assert '127.0.0.1:22' in repr(c)
assert 'ALPN' in repr(c)
assert 'TLS' not in repr(c)
assert 'TLS' in repr(c)
c.alpn_proto_negotiated = None
c.tls_established = True
c.tls_established = False
assert 'ALPN' not in repr(c)
assert 'TLS' in repr(c)
assert 'TLS' not in repr(c)
c.address = None
assert repr(c)
@skip_new_proxy_core
def test_tls_established_property(self):
c = tflow.tclient_conn()
c.tls_established = True
@ -82,6 +86,7 @@ class TestClientConnection:
class TestServerConnection:
@skip_new_proxy_core
def test_send(self):
c = tflow.tserver_conn()
c.send(b'foobar')
@ -92,6 +97,7 @@ class TestServerConnection:
c.send(['string', 'not'])
assert c.wfile.getvalue() == b'foobarfoobar'
@skip_new_proxy_core
def test_repr(self):
c = tflow.tserver_conn()
@ -115,6 +121,7 @@ class TestServerConnection:
c.address = None
assert repr(c)
@skip_new_proxy_core
def test_tls_established_property(self):
c = tflow.tserver_conn()
c.tls_established = True

View File

@ -10,7 +10,7 @@ from mitmproxy.proxy import config
from mitmproxy.proxy.server import ConnectionHandler, DummyServer, ProxyServer
from mitmproxy.tools import cmdline
from mitmproxy.tools import main
from ..conftest import skip_windows
from ..conftest import skip_windows, skip_new_proxy_core
class MockParser(argparse.ArgumentParser):
@ -74,6 +74,7 @@ class TestDummyServer:
class TestConnectionHandler:
@skip_new_proxy_core
def test_fatal_error(self, capsys):
opts = options.Options()
pconf = config.ProxyConfig(opts)

View File

@ -1,10 +1,12 @@
import io
import pytest
from mitmproxy.io import tnetstring
from mitmproxy import flowfilter
from mitmproxy.exceptions import Kill, ControlException
from mitmproxy.io import tnetstring
from mitmproxy.test import tflow
from ..conftest import skip_new_proxy_core
class TestWebSocketFlow:
@ -87,6 +89,7 @@ class TestWebSocketFlow:
tnetstring.dump(d, b)
assert b.getvalue()
@skip_new_proxy_core
def test_message_kill(self):
f = tflow.twebsocketflow()
assert not f.messages[-1].killed