diff --git a/mitmproxy/net/socks.py b/mitmproxy/net/socks.py deleted file mode 100644 index 0b2790df8..000000000 --- a/mitmproxy/net/socks.py +++ /dev/null @@ -1,231 +0,0 @@ -import struct -import array -import ipaddress - -from mitmproxy.net import check -from mitmproxy.coretypes import bidi - - -class SocksError(Exception): - def __init__(self, code, message): - super().__init__(message) - self.code = code - - -VERSION = bidi.BiDi( - SOCKS4=0x04, - SOCKS5=0x05 -) - -CMD = bidi.BiDi( - CONNECT=0x01, - BIND=0x02, - UDP_ASSOCIATE=0x03 -) - -ATYP = bidi.BiDi( - IPV4_ADDRESS=0x01, - DOMAINNAME=0x03, - IPV6_ADDRESS=0x04 -) - -REP = bidi.BiDi( - SUCCEEDED=0x00, - GENERAL_SOCKS_SERVER_FAILURE=0x01, - CONNECTION_NOT_ALLOWED_BY_RULESET=0x02, - NETWORK_UNREACHABLE=0x03, - HOST_UNREACHABLE=0x04, - CONNECTION_REFUSED=0x05, - TTL_EXPIRED=0x06, - COMMAND_NOT_SUPPORTED=0x07, - ADDRESS_TYPE_NOT_SUPPORTED=0x08, -) - -METHOD = bidi.BiDi( - NO_AUTHENTICATION_REQUIRED=0x00, - GSSAPI=0x01, - USERNAME_PASSWORD=0x02, - NO_ACCEPTABLE_METHODS=0xFF -) - -USERNAME_PASSWORD_VERSION = bidi.BiDi( - DEFAULT=0x01 -) - - -class ClientGreeting: - __slots__ = ("ver", "methods") - - def __init__(self, ver, methods): - self.ver = ver - self.methods = array.array("B") - self.methods.extend(methods) - - def assert_socks5(self): - if self.ver != VERSION.SOCKS5: - if self.ver == ord("G") and len(self.methods) == ord("E"): - guess = "Probably not a SOCKS request but a regular HTTP request. " - else: - guess = "" - - raise SocksError( - REP.GENERAL_SOCKS_SERVER_FAILURE, - guess + "Invalid SOCKS version. Expected 0x05, got 0x%x" % self.ver - ) - - @classmethod - def from_file(cls, f, fail_early=False): - """ - :param fail_early: If true, a SocksError will be raised if the first byte does not indicate socks5. - """ - ver, nmethods = struct.unpack("!BB", f.safe_read(2)) - client_greeting = cls(ver, []) - if fail_early: - client_greeting.assert_socks5() - client_greeting.methods.frombytes(f.safe_read(nmethods)) - return client_greeting - - def to_file(self, f): - f.write(struct.pack("!BB", self.ver, len(self.methods))) - f.write(self.methods.tobytes()) - - -class ServerGreeting: - __slots__ = ("ver", "method") - - def __init__(self, ver, method): - self.ver = ver - self.method = method - - def assert_socks5(self): - if self.ver != VERSION.SOCKS5: - if self.ver == ord("H") and self.method == ord("T"): - guess = "Probably not a SOCKS request but a regular HTTP response. " - else: - guess = "" - - raise SocksError( - REP.GENERAL_SOCKS_SERVER_FAILURE, - guess + "Invalid SOCKS version. Expected 0x05, got 0x%x" % self.ver - ) - - @classmethod - def from_file(cls, f): - ver, method = struct.unpack("!BB", f.safe_read(2)) - return cls(ver, method) - - def to_file(self, f): - f.write(struct.pack("!BB", self.ver, self.method)) - - -class UsernamePasswordAuth: - __slots__ = ("ver", "username", "password") - - def __init__(self, ver, username, password): - self.ver = ver - self.username = username - self.password = password - - def assert_authver1(self): - if self.ver != USERNAME_PASSWORD_VERSION.DEFAULT: - raise SocksError( - 0, - "Invalid auth version. Expected 0x01, got 0x%x" % self.ver - ) - - @classmethod - def from_file(cls, f): - ver, ulen = struct.unpack("!BB", f.safe_read(2)) - username = f.safe_read(ulen) - plen, = struct.unpack("!B", f.safe_read(1)) - password = f.safe_read(plen) - return cls(ver, username.decode(), password.decode()) - - def to_file(self, f): - f.write(struct.pack("!BB", self.ver, len(self.username))) - f.write(self.username.encode()) - f.write(struct.pack("!B", len(self.password))) - f.write(self.password.encode()) - - -class UsernamePasswordAuthResponse: - __slots__ = ("ver", "status") - - def __init__(self, ver, status): - self.ver = ver - self.status = status - - def assert_authver1(self): - if self.ver != USERNAME_PASSWORD_VERSION.DEFAULT: - raise SocksError( - 0, - "Invalid auth version. Expected 0x01, got 0x%x" % self.ver - ) - - @classmethod - def from_file(cls, f): - ver, status = struct.unpack("!BB", f.safe_read(2)) - return cls(ver, status) - - def to_file(self, f): - f.write(struct.pack("!BB", self.ver, self.status)) - - -class Message: - __slots__ = ("ver", "msg", "atyp", "addr") - - def __init__(self, ver, msg, atyp, addr): - self.ver = ver - self.msg = msg - self.atyp = atyp - self.addr = addr - - def assert_socks5(self): - if self.ver != VERSION.SOCKS5: - raise SocksError( - REP.GENERAL_SOCKS_SERVER_FAILURE, - "Invalid SOCKS version. Expected 0x05, got 0x%x" % self.ver - ) - - @classmethod - def from_file(cls, f): - ver, msg, rsv, atyp = struct.unpack("!BBBB", f.safe_read(4)) - if rsv != 0x00: - raise SocksError( - REP.GENERAL_SOCKS_SERVER_FAILURE, - "Socks Request: Invalid reserved byte: %s" % rsv - ) - if atyp == ATYP.IPV4_ADDRESS: - # We use tnoa here as ntop is not commonly available on Windows. - host = ipaddress.IPv4Address(f.safe_read(4)).compressed - elif atyp == ATYP.IPV6_ADDRESS: - host = ipaddress.IPv6Address(f.safe_read(16)).compressed - elif atyp == ATYP.DOMAINNAME: - length, = struct.unpack("!B", f.safe_read(1)) - host = f.safe_read(length) - if not check.is_valid_host(host): - raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE, "Invalid hostname: %s" % host) - host = host.decode("idna") - else: - raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED, - "Socks Request: Unknown ATYP: %s" % atyp) - - port, = struct.unpack("!H", f.safe_read(2)) - addr = (host, port) - return cls(ver, msg, atyp, addr) - - def to_file(self, f): - f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp)) - if self.atyp == ATYP.IPV4_ADDRESS: - f.write(ipaddress.IPv4Address(self.addr[0]).packed) - elif self.atyp == ATYP.IPV6_ADDRESS: - f.write(ipaddress.IPv6Address(self.addr[0]).packed) - elif self.atyp == ATYP.DOMAINNAME: - f.write(struct.pack("!B", len(self.addr[0]))) - f.write(self.addr[0].encode("idna")) - else: - raise SocksError( - REP.ADDRESS_TYPE_NOT_SUPPORTED, - "Unknown ATYP: %s" % self.atyp - ) - f.write(struct.pack("!H", self.addr[1])) diff --git a/test/mitmproxy/net/test_socks.py b/test/mitmproxy/net/test_socks.py deleted file mode 100644 index c6e2d1530..000000000 --- a/test/mitmproxy/net/test_socks.py +++ /dev/null @@ -1,210 +0,0 @@ -import ipaddress -from io import BytesIO - -import pytest - -from mitmproxy.net import socks -from mitmproxy.test import tutils - - -# this is a temporary placeholder here, we remove the file-based API when we transition socks proxying to sans-io. -class tutils: # noqa - @staticmethod - def treader(data: bytes): - io = BytesIO(data) - io.safe_read = io.read - return io - - -def test_client_greeting(): - raw = tutils.treader(b"\x05\x02\x00\xBE\xEF") - out = BytesIO() - msg = socks.ClientGreeting.from_file(raw) - msg.assert_socks5() - msg.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-1] - assert msg.ver == 5 - assert len(msg.methods) == 2 - assert 0xBE in msg.methods - assert 0xEF not in msg.methods - - -def test_client_greeting_assert_socks5(): - raw = tutils.treader(b"\x00\x00") - msg = socks.ClientGreeting.from_file(raw) - with pytest.raises(socks.SocksError): - msg.assert_socks5() - - raw = tutils.treader(b"HTTP/1.1 200 OK" + b" " * 100) - msg = socks.ClientGreeting.from_file(raw) - try: - msg.assert_socks5() - except socks.SocksError as e: - assert "Invalid SOCKS version" in str(e) - assert "HTTP" not in str(e) - else: - assert False - - raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100) - msg = socks.ClientGreeting.from_file(raw) - try: - msg.assert_socks5() - except socks.SocksError as e: - assert "Invalid SOCKS version" in str(e) - assert "HTTP" in str(e) - else: - assert False - - raw = tutils.treader(b"XX") - with pytest.raises(socks.SocksError): - socks.ClientGreeting.from_file(raw, fail_early=True) - - -def test_server_greeting(): - raw = tutils.treader(b"\x05\x02") - out = BytesIO() - msg = socks.ServerGreeting.from_file(raw) - msg.assert_socks5() - msg.to_file(out) - - assert out.getvalue() == raw.getvalue() - assert msg.ver == 5 - assert msg.method == 0x02 - - -def test_server_greeting_assert_socks5(): - raw = tutils.treader(b"HTTP/1.1 200 OK" + b" " * 100) - msg = socks.ServerGreeting.from_file(raw) - try: - msg.assert_socks5() - except socks.SocksError as e: - assert "Invalid SOCKS version" in str(e) - assert "HTTP" in str(e) - else: - assert False - - raw = tutils.treader(b"GET / HTTP/1.1" + b" " * 100) - msg = socks.ServerGreeting.from_file(raw) - try: - msg.assert_socks5() - except socks.SocksError as e: - assert "Invalid SOCKS version" in str(e) - assert "HTTP" not in str(e) - else: - assert False - - -def test_username_password_auth(): - raw = tutils.treader(b"\x01\x03usr\x03psd\xBE\xEF") - out = BytesIO() - auth = socks.UsernamePasswordAuth.from_file(raw) - auth.assert_authver1() - assert raw.read(2) == b"\xBE\xEF" - auth.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-2] - assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT - assert auth.username == "usr" - assert auth.password == "psd" - - -def test_username_password_auth_assert_ver1(): - raw = tutils.treader(b"\x02\x03usr\x03psd\xBE\xEF") - auth = socks.UsernamePasswordAuth.from_file(raw) - with pytest.raises(socks.SocksError): - auth.assert_authver1() - - -def test_username_password_auth_response(): - raw = tutils.treader(b"\x01\x00\xBE\xEF") - out = BytesIO() - auth = socks.UsernamePasswordAuthResponse.from_file(raw) - auth.assert_authver1() - assert raw.read(2) == b"\xBE\xEF" - auth.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-2] - assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT - assert auth.status == 0 - - -def test_username_password_auth_response_auth_assert_ver1(): - raw = tutils.treader(b"\x02\x00\xBE\xEF") - auth = socks.UsernamePasswordAuthResponse.from_file(raw) - with pytest.raises(socks.SocksError): - auth.assert_authver1() - - -def test_message(): - raw = tutils.treader(b"\x05\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF") - out = BytesIO() - msg = socks.Message.from_file(raw) - msg.assert_socks5() - assert raw.read(2) == b"\xBE\xEF" - msg.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-2] - assert msg.ver == 5 - assert msg.msg == 0x01 - assert msg.atyp == 0x03 - assert msg.addr == ("example.com", 0xDEAD) - - -def test_message_assert_socks5(): - raw = tutils.treader(b"\xEE\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF") - msg = socks.Message.from_file(raw) - with pytest.raises(socks.SocksError): - msg.assert_socks5() - - -def test_message_ipv4(): - # Test ATYP=0x01 (IPV4) - raw = tutils.treader(b"\x05\x01\x00\x01\x7f\x00\x00\x01\xDE\xAD\xBE\xEF") - out = BytesIO() - msg = socks.Message.from_file(raw) - left = raw.read(2) - assert left == b"\xBE\xEF" - msg.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-2] - assert msg.addr == ("127.0.0.1", 0xDEAD) - - -def test_message_ipv6(): - # Test ATYP=0x04 (IPV6) - ipv6_addr = "2001:db8:85a3:8d3:1319:8a2e:370:7344" - - raw = tutils.treader( - b"\x05\x01\x00\x04" + - ipaddress.IPv6Address(ipv6_addr).packed + - b"\xDE\xAD\xBE\xEF") - out = BytesIO() - msg = socks.Message.from_file(raw) - assert raw.read(2) == b"\xBE\xEF" - msg.to_file(out) - - assert out.getvalue() == raw.getvalue()[:-2] - assert msg.addr[0] == ipv6_addr - - -def test_message_invalid_host(): - raw = tutils.treader(b"\xEE\x01\x00\x03\x0bexample@com\xDE\xAD\xBE\xEF") - with pytest.raises(socks.SocksError, match="Invalid hostname: b'example@com'"): - socks.Message.from_file(raw) - - -def test_message_invalid_rsv(): - raw = tutils.treader(b"\x05\x01\xFF\x01\x7f\x00\x00\x01\xDE\xAD\xBE\xEF") - with pytest.raises(socks.SocksError): - socks.Message.from_file(raw) - - -def test_message_unknown_atyp(): - raw = tutils.treader(b"\x05\x02\x00\x02\x7f\x00\x00\x01\xDE\xAD\xBE\xEF") - with pytest.raises(socks.SocksError): - socks.Message.from_file(raw) - - m = socks.Message(5, 1, 0x02, ("example.com", 5050)) - with pytest.raises(socks.SocksError): - m.to_file(BytesIO())