mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-25 09:37:37 +00:00
Merge pull request #4780 from mhils/socks5-auth
Support SOCKS5 Authentication
This commit is contained in:
commit
d5bba9878b
@ -2,6 +2,7 @@
|
||||
|
||||
## Unreleased: mitmproxy next
|
||||
|
||||
* Support proxy authentication for SOCKS v5 mode (@starplanet)
|
||||
* fix some responses not being decoded properly if the encoding was uppercase #4735 (@Mattwmaster58)
|
||||
* Expose TLS 1.0 as possible minimum version on older pyOpenSSL releases
|
||||
* Improve error message on TLS version mismatch.
|
||||
|
@ -8,7 +8,7 @@ from typing import List, Type
|
||||
import mitmproxy.addons.next_layer # noqa
|
||||
from mitmproxy import hooks, log, addonmanager
|
||||
from mitmproxy.proxy import server_hooks, layer
|
||||
from mitmproxy.proxy.layers import http, tcp, tls, websocket
|
||||
from mitmproxy.proxy.layers import http, modes, tcp, tls, websocket
|
||||
|
||||
known = set()
|
||||
|
||||
@ -137,6 +137,14 @@ with outfile.open("w") as f, contextlib.redirect_stdout(f):
|
||||
]
|
||||
)
|
||||
|
||||
category(
|
||||
"SOCKSv5",
|
||||
"",
|
||||
[
|
||||
modes.Socks5AuthHook,
|
||||
]
|
||||
)
|
||||
|
||||
category(
|
||||
"AdvancedLifecycle",
|
||||
"",
|
||||
|
@ -1,5 +1,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import binascii
|
||||
import weakref
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import MutableMapping
|
||||
from typing import Optional
|
||||
from typing import Tuple
|
||||
@ -7,14 +10,136 @@ from typing import Tuple
|
||||
import ldap3
|
||||
import passlib.apache
|
||||
|
||||
from mitmproxy import ctx, connection
|
||||
from mitmproxy import connection, ctx
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy import http
|
||||
from mitmproxy.net.http import status_codes
|
||||
from mitmproxy.proxy.layers import modes
|
||||
|
||||
REALM = "mitmproxy"
|
||||
|
||||
|
||||
class ProxyAuth:
|
||||
validator: Optional[Validator] = None
|
||||
|
||||
def __init__(self):
|
||||
self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"proxyauth", Optional[str], None,
|
||||
"""
|
||||
Require proxy authentication. Format:
|
||||
"username:pass",
|
||||
"any" to accept any user/pass combination,
|
||||
"@path" to use an Apache htpasswd file,
|
||||
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
|
||||
"""
|
||||
)
|
||||
|
||||
def configure(self, updated):
|
||||
if "proxyauth" not in updated:
|
||||
return
|
||||
auth = ctx.options.proxyauth
|
||||
if auth:
|
||||
if ctx.options.mode == "transparent":
|
||||
raise exceptions.OptionsError("Proxy Authentication not supported in transparent mode.")
|
||||
|
||||
if auth == "any":
|
||||
self.validator = AcceptAll()
|
||||
elif auth.startswith("@"):
|
||||
self.validator = Htpasswd(auth)
|
||||
elif ctx.options.proxyauth.startswith("ldap"):
|
||||
self.validator = Ldap(auth)
|
||||
elif ":" in ctx.options.proxyauth:
|
||||
self.validator = SingleUser(auth)
|
||||
else:
|
||||
raise exceptions.OptionsError("Invalid proxyauth specification.")
|
||||
else:
|
||||
self.validator = None
|
||||
|
||||
def socks5_auth(self, data: modes.Socks5AuthData) -> None:
|
||||
if self.validator and self.validator(data.username, data.password):
|
||||
data.valid = True
|
||||
self.authenticated[data.client_conn] = data.username, data.password
|
||||
|
||||
def http_connect(self, f: http.HTTPFlow) -> None:
|
||||
if self.validator and self.authenticate_http(f):
|
||||
# Make a note that all further requests over this connection are ok.
|
||||
self.authenticated[f.client_conn] = f.metadata["proxyauth"]
|
||||
|
||||
def requestheaders(self, f: http.HTTPFlow) -> None:
|
||||
if self.validator:
|
||||
# Is this connection authenticated by a previous HTTP CONNECT?
|
||||
if f.client_conn in self.authenticated:
|
||||
f.metadata["proxyauth"] = self.authenticated[f.client_conn]
|
||||
else:
|
||||
self.authenticate_http(f)
|
||||
|
||||
def authenticate_http(self, f: http.HTTPFlow) -> bool:
|
||||
"""
|
||||
Authenticate an HTTP request, returns if authentication was successful.
|
||||
|
||||
If valid credentials are found, the matching authentication header is removed.
|
||||
In no or invalid credentials are found, flow.response is set to an error page.
|
||||
"""
|
||||
assert self.validator
|
||||
username = None
|
||||
password = None
|
||||
is_valid = False
|
||||
try:
|
||||
auth_value = f.request.headers.get(self.http_auth_header, "")
|
||||
scheme, username, password = parse_http_basic_auth(auth_value)
|
||||
is_valid = self.validator(username, password)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if is_valid:
|
||||
f.metadata["proxyauth"] = (username, password)
|
||||
del f.request.headers[self.http_auth_header]
|
||||
return True
|
||||
else:
|
||||
f.response = self.make_auth_required_response()
|
||||
return False
|
||||
|
||||
def make_auth_required_response(self) -> http.Response:
|
||||
if self.is_http_proxy:
|
||||
status_code = status_codes.PROXY_AUTH_REQUIRED
|
||||
headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
|
||||
else:
|
||||
status_code = status_codes.UNAUTHORIZED
|
||||
headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
|
||||
|
||||
reason = http.status_codes.RESPONSES[status_code]
|
||||
return http.Response.make(
|
||||
status_code,
|
||||
(
|
||||
f"<html>"
|
||||
f"<head><title>{status_code} {reason}</title></head>"
|
||||
f"<body><h1>{status_code} {reason}</h1></body>"
|
||||
f"</html>"
|
||||
),
|
||||
headers
|
||||
)
|
||||
|
||||
@property
|
||||
def http_auth_header(self) -> str:
|
||||
if self.is_http_proxy:
|
||||
return "Proxy-Authorization"
|
||||
else:
|
||||
return "Authorization"
|
||||
|
||||
@property
|
||||
def is_http_proxy(self) -> bool:
|
||||
"""
|
||||
Returns:
|
||||
- True, if authentication is done as if mitmproxy is a proxy
|
||||
- False, if authentication is done as if mitmproxy is an HTTP server
|
||||
"""
|
||||
return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
|
||||
|
||||
|
||||
def mkauth(username: str, password: str, scheme: str = "basic") -> str:
|
||||
"""
|
||||
Craft a basic auth string
|
||||
@ -40,183 +165,78 @@ def parse_http_basic_auth(s: str) -> Tuple[str, str, str]:
|
||||
return scheme, user, password
|
||||
|
||||
|
||||
class ProxyAuth:
|
||||
def __init__(self):
|
||||
self.nonanonymous = False
|
||||
self.htpasswd = None
|
||||
self.singleuser = None
|
||||
self.ldapconn = None
|
||||
self.ldapserver = None
|
||||
self.authenticated: MutableMapping[connection.Client, Tuple[str, str]] = weakref.WeakKeyDictionary()
|
||||
"""Contains all connections that are permanently authenticated after an HTTP CONNECT"""
|
||||
class Validator(ABC):
|
||||
"""Base class for all username/password validators."""
|
||||
|
||||
def load(self, loader):
|
||||
loader.add_option(
|
||||
"proxyauth", Optional[str], None,
|
||||
"""
|
||||
Require proxy authentication. Format:
|
||||
"username:pass",
|
||||
"any" to accept any user/pass combination,
|
||||
"@path" to use an Apache htpasswd file,
|
||||
or "ldap[s]:url_server_ldap:dn_auth:password:dn_subtree" for LDAP authentication.
|
||||
"""
|
||||
)
|
||||
@abstractmethod
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
raise NotImplementedError
|
||||
|
||||
def enabled(self) -> bool:
|
||||
return any([self.nonanonymous, self.htpasswd, self.singleuser, self.ldapconn, self.ldapserver])
|
||||
|
||||
def is_proxy_auth(self) -> bool:
|
||||
"""
|
||||
Returns:
|
||||
- True, if authentication is done as if mitmproxy is a proxy
|
||||
- False, if authentication is done as if mitmproxy is a HTTP server
|
||||
"""
|
||||
return ctx.options.mode == "regular" or ctx.options.mode.startswith("upstream:")
|
||||
class AcceptAll(Validator):
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return True
|
||||
|
||||
def which_auth_header(self) -> str:
|
||||
if self.is_proxy_auth():
|
||||
return 'Proxy-Authorization'
|
||||
else:
|
||||
return 'Authorization'
|
||||
|
||||
def auth_required_response(self) -> http.Response:
|
||||
if self.is_proxy_auth():
|
||||
status_code = status_codes.PROXY_AUTH_REQUIRED
|
||||
headers = {"Proxy-Authenticate": f'Basic realm="{REALM}"'}
|
||||
else:
|
||||
status_code = status_codes.UNAUTHORIZED
|
||||
headers = {"WWW-Authenticate": f'Basic realm="{REALM}"'}
|
||||
|
||||
reason = http.status_codes.RESPONSES[status_code]
|
||||
return http.Response.make(
|
||||
status_code,
|
||||
(
|
||||
f"<html>"
|
||||
f"<head><title>{status_code} {reason}</title></head>"
|
||||
f"<body><h1>{status_code} {reason}</h1></body>"
|
||||
f"</html>"
|
||||
),
|
||||
headers
|
||||
)
|
||||
|
||||
def check(self, f: http.HTTPFlow) -> Optional[Tuple[str, str]]:
|
||||
"""
|
||||
Check if a request is correctly authenticated.
|
||||
Returns:
|
||||
- a (username, password) tuple if successful,
|
||||
- None, otherwise.
|
||||
"""
|
||||
auth_value = f.request.headers.get(self.which_auth_header(), "")
|
||||
class SingleUser(Validator):
|
||||
def __init__(self, proxyauth: str):
|
||||
try:
|
||||
scheme, username, password = parse_http_basic_auth(auth_value)
|
||||
self.username, self.password = proxyauth.split(':')
|
||||
except ValueError:
|
||||
return None
|
||||
raise exceptions.OptionsError("Invalid single-user auth specification.")
|
||||
|
||||
if self.nonanonymous:
|
||||
return username, password
|
||||
elif self.singleuser:
|
||||
if self.singleuser == [username, password]:
|
||||
return username, password
|
||||
elif self.htpasswd:
|
||||
if self.htpasswd.check_password(username, password):
|
||||
return username, password
|
||||
elif self.ldapconn:
|
||||
if not username or not password:
|
||||
return None
|
||||
self.ldapconn.search(ctx.options.proxyauth.split(':')[4], '(cn=' + username + ')')
|
||||
if self.ldapconn.response:
|
||||
conn = ldap3.Connection(
|
||||
self.ldapserver,
|
||||
self.ldapconn.response[0]['dn'],
|
||||
password,
|
||||
auto_bind=True)
|
||||
if conn:
|
||||
return username, password
|
||||
return None
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return self.username == username and self.password == password
|
||||
|
||||
def authenticate(self, f: http.HTTPFlow) -> bool:
|
||||
valid_credentials = self.check(f)
|
||||
if valid_credentials:
|
||||
f.metadata["proxyauth"] = valid_credentials
|
||||
del f.request.headers[self.which_auth_header()]
|
||||
return True
|
||||
|
||||
class Htpasswd(Validator):
|
||||
def __init__(self, proxyauth: str):
|
||||
path = proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(path)
|
||||
except (ValueError, OSError):
|
||||
raise exceptions.OptionsError(f"Could not open htpasswd file: {path}")
|
||||
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
return self.htpasswd.check_password(username, password)
|
||||
|
||||
|
||||
class Ldap(Validator):
|
||||
conn: ldap3.Connection
|
||||
server: ldap3.Server
|
||||
dn_subtree: str
|
||||
|
||||
def __init__(self, proxyauth: str):
|
||||
try:
|
||||
security, url, ldap_user, ldap_pass, self.dn_subtree = proxyauth.split(":")
|
||||
except ValueError:
|
||||
raise exceptions.OptionsError("Invalid ldap specification")
|
||||
if security == "ldaps":
|
||||
server = ldap3.Server(url, use_ssl=True)
|
||||
elif security == "ldap":
|
||||
server = ldap3.Server(url)
|
||||
else:
|
||||
f.response = self.auth_required_response()
|
||||
raise exceptions.OptionsError("Invalid ldap specification on the first part")
|
||||
conn = ldap3.Connection(
|
||||
server,
|
||||
ldap_user,
|
||||
ldap_pass,
|
||||
auto_bind=True
|
||||
)
|
||||
self.conn = conn
|
||||
self.server = server
|
||||
|
||||
def __call__(self, username: str, password: str) -> bool:
|
||||
if not username or not password:
|
||||
return False
|
||||
|
||||
# Handlers
|
||||
def configure(self, updated):
|
||||
if "proxyauth" in updated:
|
||||
self.nonanonymous = False
|
||||
self.singleuser = None
|
||||
self.htpasswd = None
|
||||
self.ldapserver = None
|
||||
if ctx.options.proxyauth:
|
||||
if ctx.options.proxyauth == "any":
|
||||
self.nonanonymous = True
|
||||
elif ctx.options.proxyauth.startswith("@"):
|
||||
p = ctx.options.proxyauth[1:]
|
||||
try:
|
||||
self.htpasswd = passlib.apache.HtpasswdFile(p)
|
||||
except (ValueError, OSError):
|
||||
raise exceptions.OptionsError(
|
||||
"Could not open htpasswd file: %s" % p
|
||||
)
|
||||
elif ctx.options.proxyauth.startswith("ldap"):
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 5:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specification"
|
||||
)
|
||||
security = parts[0]
|
||||
ldap_server = parts[1]
|
||||
dn_baseauth = parts[2]
|
||||
password_baseauth = parts[3]
|
||||
if security == "ldaps":
|
||||
server = ldap3.Server(ldap_server, use_ssl=True)
|
||||
elif security == "ldap":
|
||||
server = ldap3.Server(ldap_server)
|
||||
else:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid ldap specification on the first part"
|
||||
)
|
||||
conn = ldap3.Connection(
|
||||
server,
|
||||
dn_baseauth,
|
||||
password_baseauth,
|
||||
auto_bind=True)
|
||||
self.ldapconn = conn
|
||||
self.ldapserver = server
|
||||
elif ":" in ctx.options.proxyauth:
|
||||
parts = ctx.options.proxyauth.split(':')
|
||||
if len(parts) != 2:
|
||||
raise exceptions.OptionsError(
|
||||
"Invalid single-user auth specification."
|
||||
)
|
||||
self.singleuser = parts
|
||||
else:
|
||||
raise exceptions.OptionsError("Invalid proxyauth specification.")
|
||||
if self.enabled():
|
||||
if ctx.options.mode == "transparent":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in transparent mode."
|
||||
)
|
||||
if ctx.options.mode == "socks5":
|
||||
raise exceptions.OptionsError(
|
||||
"Proxy Authentication not supported in SOCKS mode. "
|
||||
"https://github.com/mitmproxy/mitmproxy/issues/738"
|
||||
)
|
||||
# TODO: check for multiple auth options
|
||||
|
||||
def http_connect(self, f: http.HTTPFlow) -> None:
|
||||
if self.enabled():
|
||||
if self.authenticate(f):
|
||||
self.authenticated[f.client_conn] = f.metadata["proxyauth"]
|
||||
|
||||
def requestheaders(self, f: http.HTTPFlow) -> None:
|
||||
if self.enabled():
|
||||
# Is this connection authenticated by a previous HTTP CONNECT?
|
||||
if f.client_conn in self.authenticated:
|
||||
f.metadata["proxyauth"] = self.authenticated[f.client_conn]
|
||||
return
|
||||
self.authenticate(f)
|
||||
self.conn.search(self.dn_subtree, f"(cn={username})")
|
||||
if self.conn.response:
|
||||
c = ldap3.Connection(
|
||||
self.server,
|
||||
self.conn.response[0]["dn"],
|
||||
password,
|
||||
auto_bind=True
|
||||
)
|
||||
if c:
|
||||
return True
|
||||
return False
|
||||
|
@ -1,11 +1,13 @@
|
||||
import socket
|
||||
import struct
|
||||
from abc import ABCMeta
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from mitmproxy import platform
|
||||
from mitmproxy import connection, platform
|
||||
from mitmproxy.net import server_spec
|
||||
from mitmproxy.proxy import commands, events, layer
|
||||
from mitmproxy.proxy.commands import StartHook
|
||||
from mitmproxy.proxy.layers import tls
|
||||
from mitmproxy.proxy.utils import expect
|
||||
|
||||
@ -76,6 +78,7 @@ class TransparentProxy(DestinationKnown):
|
||||
SOCKS5_VERSION = 0x05
|
||||
|
||||
SOCKS5_METHOD_NO_AUTHENTICATION_REQUIRED = 0x00
|
||||
SOCKS5_METHOD_USER_PASSWORD_AUTHENTICATION = 0x02
|
||||
SOCKS5_METHOD_NO_ACCEPTABLE_METHODS = 0xFF
|
||||
|
||||
SOCKS5_ATYP_IPV4_ADDRESS = 0x01
|
||||
@ -87,9 +90,26 @@ SOCKS5_REP_COMMAND_NOT_SUPPORTED = 0x07
|
||||
SOCKS5_REP_ADDRESS_TYPE_NOT_SUPPORTED = 0x08
|
||||
|
||||
|
||||
@dataclass
|
||||
class Socks5AuthData:
|
||||
client_conn: connection.Client
|
||||
username: str
|
||||
password: str
|
||||
valid: bool = False
|
||||
|
||||
|
||||
@dataclass
|
||||
class Socks5AuthHook(StartHook):
|
||||
"""
|
||||
Mitmproxy has received username/password SOCKS5 credentials.
|
||||
|
||||
This hook decides whether they are valid by setting `data.valid`.
|
||||
"""
|
||||
data: Socks5AuthData
|
||||
|
||||
|
||||
class Socks5Proxy(DestinationKnown):
|
||||
buf: bytes = b""
|
||||
greeted: bool = False
|
||||
|
||||
def socks_err(
|
||||
self,
|
||||
@ -111,92 +131,129 @@ class Socks5Proxy(DestinationKnown):
|
||||
pass
|
||||
elif isinstance(event, events.DataReceived):
|
||||
self.buf += event.data
|
||||
|
||||
if not self.greeted:
|
||||
# Parse Client Greeting
|
||||
if len(self.buf) < 2:
|
||||
return
|
||||
|
||||
if self.buf[0] != SOCKS5_VERSION:
|
||||
if self.buf[:3].isupper():
|
||||
guess = "Probably not a SOCKS request but a regular HTTP request. "
|
||||
else:
|
||||
guess = ""
|
||||
yield from self.socks_err(guess + "Invalid SOCKS version. Expected 0x05, got 0x%x" % self.buf[0])
|
||||
return
|
||||
|
||||
n_methods = self.buf[1]
|
||||
if len(self.buf) < 2 + n_methods:
|
||||
return
|
||||
if SOCKS5_METHOD_NO_AUTHENTICATION_REQUIRED not in self.buf[2:2 + n_methods]:
|
||||
yield from self.socks_err("mitmproxy only supports SOCKS without authentication",
|
||||
SOCKS5_METHOD_NO_ACCEPTABLE_METHODS)
|
||||
return
|
||||
|
||||
# Send Server Greeting
|
||||
# Ver = SOCKS5, Auth = NO_AUTH
|
||||
yield commands.SendData(self.context.client, b"\x05\x00")
|
||||
self.buf = self.buf[2 + n_methods:]
|
||||
self.greeted = True
|
||||
|
||||
# Parse Connect Request
|
||||
if len(self.buf) < 4:
|
||||
return
|
||||
|
||||
if self.buf[:3] != b"\x05\x01\x00":
|
||||
yield from self.socks_err(f"Unsupported SOCKS5 request: {self.buf!r}", SOCKS5_REP_COMMAND_NOT_SUPPORTED)
|
||||
return
|
||||
|
||||
# Determine message length
|
||||
atyp = self.buf[3]
|
||||
message_len: int
|
||||
if atyp == SOCKS5_ATYP_IPV4_ADDRESS:
|
||||
message_len = 4 + 4 + 2
|
||||
elif atyp == SOCKS5_ATYP_IPV6_ADDRESS:
|
||||
message_len = 4 + 16 + 2
|
||||
elif atyp == SOCKS5_ATYP_DOMAINNAME:
|
||||
message_len = 4 + 1 + self.buf[4] + 2
|
||||
else:
|
||||
yield from self.socks_err(f"Unknown address type: {atyp}", SOCKS5_REP_ADDRESS_TYPE_NOT_SUPPORTED)
|
||||
return
|
||||
|
||||
# Do we have enough bytes yet?
|
||||
if len(self.buf) < message_len:
|
||||
return
|
||||
|
||||
# Parse host and port
|
||||
msg, self.buf = self.buf[:message_len], self.buf[message_len:]
|
||||
|
||||
host: str
|
||||
if atyp == SOCKS5_ATYP_IPV4_ADDRESS:
|
||||
host = socket.inet_ntop(socket.AF_INET, msg[4:-2])
|
||||
elif atyp == SOCKS5_ATYP_IPV6_ADDRESS:
|
||||
host = socket.inet_ntop(socket.AF_INET6, msg[4:-2])
|
||||
else:
|
||||
host_bytes = msg[5:-2]
|
||||
host = host_bytes.decode("ascii", "replace")
|
||||
|
||||
port, = struct.unpack("!H", msg[-2:])
|
||||
|
||||
# We now have all we need, let's get going.
|
||||
self.context.server.address = (host, port)
|
||||
self.child_layer = layer.NextLayer(self.context)
|
||||
|
||||
# this already triggers the child layer's Start event,
|
||||
# but that's not a problem in practice...
|
||||
err = yield from self.finish_start()
|
||||
if err:
|
||||
yield commands.SendData(self.context.client, b"\x05\x04\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
else:
|
||||
yield commands.SendData(self.context.client, b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
if self.buf:
|
||||
yield from self.child_layer.handle_event(events.DataReceived(self.context.client, self.buf))
|
||||
del self.buf
|
||||
|
||||
yield from self.state()
|
||||
elif isinstance(event, events.ConnectionClosed):
|
||||
if self.buf:
|
||||
yield commands.Log(f"Client closed connection before completing SOCKS5 handshake: {self.buf!r}")
|
||||
yield commands.CloseConnection(event.connection)
|
||||
else:
|
||||
raise AssertionError(f"Unknown event: {event}")
|
||||
|
||||
def state_greet(self):
|
||||
if len(self.buf) < 2:
|
||||
return
|
||||
|
||||
if self.buf[0] != SOCKS5_VERSION:
|
||||
if self.buf[:3].isupper():
|
||||
guess = "Probably not a SOCKS request but a regular HTTP request. "
|
||||
else:
|
||||
guess = ""
|
||||
yield from self.socks_err(guess + "Invalid SOCKS version. Expected 0x05, got 0x%x" % self.buf[0])
|
||||
return
|
||||
|
||||
n_methods = self.buf[1]
|
||||
if len(self.buf) < 2 + n_methods:
|
||||
return
|
||||
|
||||
if "proxyauth" in self.context.options and self.context.options.proxyauth:
|
||||
method = SOCKS5_METHOD_USER_PASSWORD_AUTHENTICATION
|
||||
self.state = self.state_auth
|
||||
else:
|
||||
method = SOCKS5_METHOD_NO_AUTHENTICATION_REQUIRED
|
||||
self.state = self.state_connect
|
||||
|
||||
if method not in self.buf[2:2 + n_methods]:
|
||||
method_str = "user/password" if method == SOCKS5_METHOD_USER_PASSWORD_AUTHENTICATION else "no"
|
||||
yield from self.socks_err(
|
||||
f"Client does not support SOCKS5 with {method_str} authentication.",
|
||||
SOCKS5_METHOD_NO_ACCEPTABLE_METHODS
|
||||
)
|
||||
return
|
||||
yield commands.SendData(self.context.client, bytes([SOCKS5_VERSION, method]))
|
||||
self.buf = self.buf[2 + n_methods:]
|
||||
yield from self.state()
|
||||
|
||||
state = state_greet
|
||||
|
||||
def state_auth(self):
|
||||
if len(self.buf) < 3:
|
||||
return
|
||||
|
||||
# Parsing username and password, which is somewhat atrocious
|
||||
user_len = self.buf[1]
|
||||
if len(self.buf) < 3 + user_len:
|
||||
return
|
||||
pass_len = self.buf[2 + user_len]
|
||||
if len(self.buf) < 3 + user_len + pass_len:
|
||||
return
|
||||
user = self.buf[2:(2 + user_len)].decode("utf-8", "backslashreplace")
|
||||
password = self.buf[(3 + user_len):(3 + user_len + pass_len)].decode("utf-8", "backslashreplace")
|
||||
|
||||
data = Socks5AuthData(self.context.client, user, password)
|
||||
yield Socks5AuthHook(data)
|
||||
if not data.valid:
|
||||
# The VER field contains the current **version of the subnegotiation**, which is X'01'.
|
||||
yield commands.SendData(self.context.client, b"\x01\x01")
|
||||
yield from self.socks_err("authentication failed")
|
||||
return
|
||||
|
||||
yield commands.SendData(self.context.client, b"\x01\x00")
|
||||
self.buf = self.buf[3 + user_len + pass_len:]
|
||||
self.state = self.state_connect
|
||||
yield from self.state()
|
||||
|
||||
def state_connect(self):
|
||||
# Parse Connect Request
|
||||
if len(self.buf) < 4:
|
||||
return
|
||||
|
||||
if self.buf[:3] != b"\x05\x01\x00":
|
||||
yield from self.socks_err(f"Unsupported SOCKS5 request: {self.buf!r}", SOCKS5_REP_COMMAND_NOT_SUPPORTED)
|
||||
return
|
||||
|
||||
# Determine message length
|
||||
atyp = self.buf[3]
|
||||
message_len: int
|
||||
if atyp == SOCKS5_ATYP_IPV4_ADDRESS:
|
||||
message_len = 4 + 4 + 2
|
||||
elif atyp == SOCKS5_ATYP_IPV6_ADDRESS:
|
||||
message_len = 4 + 16 + 2
|
||||
elif atyp == SOCKS5_ATYP_DOMAINNAME:
|
||||
message_len = 4 + 1 + self.buf[4] + 2
|
||||
else:
|
||||
yield from self.socks_err(f"Unknown address type: {atyp}", SOCKS5_REP_ADDRESS_TYPE_NOT_SUPPORTED)
|
||||
return
|
||||
|
||||
# Do we have enough bytes yet?
|
||||
if len(self.buf) < message_len:
|
||||
return
|
||||
|
||||
# Parse host and port
|
||||
msg, self.buf = self.buf[:message_len], self.buf[message_len:]
|
||||
|
||||
host: str
|
||||
if atyp == SOCKS5_ATYP_IPV4_ADDRESS:
|
||||
host = socket.inet_ntop(socket.AF_INET, msg[4:-2])
|
||||
elif atyp == SOCKS5_ATYP_IPV6_ADDRESS:
|
||||
host = socket.inet_ntop(socket.AF_INET6, msg[4:-2])
|
||||
else:
|
||||
host_bytes = msg[5:-2]
|
||||
host = host_bytes.decode("ascii", "replace")
|
||||
|
||||
port, = struct.unpack("!H", msg[-2:])
|
||||
|
||||
# We now have all we need, let's get going.
|
||||
self.context.server.address = (host, port)
|
||||
self.child_layer = layer.NextLayer(self.context)
|
||||
|
||||
# this already triggers the child layer's Start event,
|
||||
# but that's not a problem in practice...
|
||||
err = yield from self.finish_start()
|
||||
if err:
|
||||
yield commands.SendData(self.context.client, b"\x05\x04\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
else:
|
||||
yield commands.SendData(self.context.client, b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
if self.buf:
|
||||
yield from self.child_layer.handle_event(events.DataReceived(self.context.client, self.buf))
|
||||
del self.buf
|
||||
|
@ -6,6 +6,7 @@ import pytest
|
||||
|
||||
from mitmproxy import exceptions
|
||||
from mitmproxy.addons import proxyauth
|
||||
from mitmproxy.proxy.layers import modes
|
||||
from mitmproxy.test import taddons
|
||||
from mitmproxy.test import tflow
|
||||
|
||||
@ -47,89 +48,42 @@ class TestProxyAuth:
|
||||
('upstream:', True),
|
||||
('upstream:foobar', True),
|
||||
])
|
||||
def test_is_proxy_auth(self, mode, expected):
|
||||
def test_is_http_proxy(self, mode, expected):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context(up, loadcore=False) as ctx:
|
||||
ctx.options.mode = mode
|
||||
assert up.is_proxy_auth() is expected
|
||||
assert up.is_http_proxy is expected
|
||||
|
||||
@pytest.mark.parametrize('is_proxy_auth, expected', [
|
||||
@pytest.mark.parametrize('is_http_proxy, expected', [
|
||||
(True, 'Proxy-Authorization'),
|
||||
(False, 'Authorization'),
|
||||
])
|
||||
def test_which_auth_header(self, is_proxy_auth, expected):
|
||||
def test_which_auth_header(self, is_http_proxy, expected):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||
assert up.which_auth_header() == expected
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
|
||||
assert up.http_auth_header == expected
|
||||
|
||||
@pytest.mark.parametrize('is_proxy_auth, expected_status_code, expected_header', [
|
||||
@pytest.mark.parametrize('is_http_proxy, expected_status_code, expected_header', [
|
||||
(True, 407, 'Proxy-Authenticate'),
|
||||
(False, 401, 'WWW-Authenticate'),
|
||||
])
|
||||
def test_auth_required_response(self, is_proxy_auth, expected_status_code, expected_header):
|
||||
def test_auth_required_response(self, is_http_proxy, expected_status_code, expected_header):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_proxy_auth', return_value=is_proxy_auth):
|
||||
resp = up.auth_required_response()
|
||||
with mock.patch('mitmproxy.addons.proxyauth.ProxyAuth.is_http_proxy', new=is_http_proxy):
|
||||
resp = up.make_auth_required_response()
|
||||
assert resp.status_code == expected_status_code
|
||||
assert expected_header in resp.headers.keys()
|
||||
|
||||
def test_check(self, tdata):
|
||||
up = proxyauth.ProxyAuth()
|
||||
with taddons.context(up) as ctx:
|
||||
ctx.configure(up, proxyauth="any", mode="regular")
|
||||
f = tflow.tflow()
|
||||
assert not up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = "invalid"
|
||||
assert not up.check(f)
|
||||
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test", scheme="unknown"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(up, proxyauth="test:test")
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
ctx.configure(up, proxyauth="test:foo")
|
||||
assert not up.check(f)
|
||||
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth="@" + tdata.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "foo"
|
||||
)
|
||||
assert not up.check(f)
|
||||
|
||||
with mock.patch('ldap3.Server', return_value="ldap://fake_server:389 - cleartext"):
|
||||
with mock.patch('ldap3.Connection', search="test"):
|
||||
with mock.patch('ldap3.Connection.search', return_value="test"):
|
||||
ctx.configure(
|
||||
up,
|
||||
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
assert up.check(f)
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"", ""
|
||||
)
|
||||
assert not up.check(f)
|
||||
def test_socks5(self):
|
||||
pa = proxyauth.ProxyAuth()
|
||||
with taddons.context(pa, loadcore=False) as ctx:
|
||||
ctx.configure(pa, proxyauth="foo:bar", mode="regular")
|
||||
data = modes.Socks5AuthData(tflow.tclient_conn(), "foo", "baz")
|
||||
pa.socks5_auth(data)
|
||||
assert not data.valid
|
||||
data.password = "bar"
|
||||
pa.socks5_auth(data)
|
||||
assert data.valid
|
||||
|
||||
def test_authenticate(self):
|
||||
up = proxyauth.ProxyAuth()
|
||||
@ -138,64 +92,60 @@ class TestProxyAuth:
|
||||
|
||||
f = tflow.tflow()
|
||||
assert not f.response
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert f.response.status_code == 407
|
||||
|
||||
f = tflow.tflow()
|
||||
f.request.headers["Proxy-Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Proxy-Authorization")
|
||||
|
||||
f = tflow.tflow()
|
||||
ctx.configure(up, mode="reverse")
|
||||
assert not f.response
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert f.response.status_code == 401
|
||||
|
||||
f = tflow.tflow()
|
||||
f.request.headers["Authorization"] = proxyauth.mkauth(
|
||||
"test", "test"
|
||||
)
|
||||
up.authenticate(f)
|
||||
up.authenticate_http(f)
|
||||
assert not f.response
|
||||
assert not f.request.headers.get("Authorization")
|
||||
|
||||
def test_configure(self, monkeypatch, tdata):
|
||||
monkeypatch.setattr(ldap3, "Server", lambda *_, **__: True)
|
||||
monkeypatch.setattr(ldap3, "Connection", lambda *_, **__: True)
|
||||
monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
|
||||
monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
|
||||
|
||||
pa = proxyauth.ProxyAuth()
|
||||
with taddons.context(pa) as ctx:
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid proxyauth specification"):
|
||||
ctx.configure(pa, proxyauth="foo")
|
||||
|
||||
ctx.configure(pa, proxyauth="foo:bar")
|
||||
assert isinstance(pa.validator, proxyauth.SingleUser)
|
||||
assert pa.validator("foo", "bar")
|
||||
assert not pa.validator("foo", "baz")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid single-user auth specification."):
|
||||
ctx.configure(pa, proxyauth="foo:bar:baz")
|
||||
|
||||
ctx.configure(pa, proxyauth="foo:bar")
|
||||
assert pa.singleuser == ["foo", "bar"]
|
||||
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert pa.singleuser is None
|
||||
|
||||
ctx.configure(pa, proxyauth="any")
|
||||
assert pa.nonanonymous
|
||||
assert isinstance(pa.validator, proxyauth.AcceptAll)
|
||||
assert pa.validator("foo", "bar")
|
||||
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert not pa.nonanonymous
|
||||
assert pa.validator is None
|
||||
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="ldap:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
assert pa.ldapserver
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com"
|
||||
)
|
||||
assert pa.ldapserver
|
||||
assert isinstance(pa.validator, proxyauth.Ldap)
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Invalid ldap specification"):
|
||||
ctx.configure(pa, proxyauth="ldap:test:test:test")
|
||||
@ -207,30 +157,18 @@ class TestProxyAuth:
|
||||
ctx.configure(pa, proxyauth="ldapssssssss:fake_server:dn:password:tree")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt")
|
||||
)
|
||||
ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/server.crt"))
|
||||
with pytest.raises(exceptions.OptionsError, match="Could not open htpasswd file"):
|
||||
ctx.configure(pa, proxyauth="@nonexistent")
|
||||
|
||||
ctx.configure(
|
||||
pa,
|
||||
proxyauth="@" + tdata.path(
|
||||
"mitmproxy/net/data/htpasswd"
|
||||
)
|
||||
)
|
||||
assert pa.htpasswd
|
||||
assert pa.htpasswd.check_password("test", "test")
|
||||
assert not pa.htpasswd.check_password("test", "foo")
|
||||
ctx.configure(pa, proxyauth=None)
|
||||
assert not pa.htpasswd
|
||||
ctx.configure(pa, proxyauth="@" + tdata.path("mitmproxy/net/data/htpasswd"))
|
||||
assert isinstance(pa.validator, proxyauth.Htpasswd)
|
||||
assert pa.validator("test", "test")
|
||||
assert not pa.validator("test", "foo")
|
||||
|
||||
with pytest.raises(exceptions.OptionsError,
|
||||
match="Proxy Authentication not supported in transparent mode."):
|
||||
ctx.configure(pa, proxyauth="any", mode="transparent")
|
||||
with pytest.raises(exceptions.OptionsError, match="Proxy Authentication not supported in SOCKS mode."):
|
||||
ctx.configure(pa, proxyauth="any", mode="socks5")
|
||||
|
||||
def test_handlers(self):
|
||||
up = proxyauth.ProxyAuth()
|
||||
@ -260,3 +198,14 @@ class TestProxyAuth:
|
||||
up.requestheaders(f2)
|
||||
assert not f2.response
|
||||
assert f2.metadata["proxyauth"] == ('test', 'test')
|
||||
|
||||
|
||||
def test_ldap(monkeypatch):
|
||||
monkeypatch.setattr(ldap3, "Server", mock.MagicMock())
|
||||
monkeypatch.setattr(ldap3, "Connection", mock.MagicMock())
|
||||
|
||||
validator = proxyauth.Ldap("ldaps:localhost:cn=default,dc=cdhdt,dc=com:password:ou=application,dc=cdhdt,dc=com")
|
||||
assert not validator("", "")
|
||||
assert validator("foo", "bar")
|
||||
validator.conn.response = False
|
||||
assert not validator("foo", "bar")
|
||||
|
@ -3,6 +3,7 @@ import copy
|
||||
import pytest
|
||||
|
||||
from mitmproxy import platform
|
||||
from mitmproxy.addons.proxyauth import ProxyAuth
|
||||
from mitmproxy.connection import Client, Server
|
||||
from mitmproxy.proxy.commands import CloseConnection, GetSocket, Log, OpenConnection, SendData
|
||||
from mitmproxy.proxy.context import Context
|
||||
@ -316,12 +317,23 @@ def test_socks5_success(address: str, packed: bytes, tctx: Context):
|
||||
assert nextlayer().data_client() == b"applicationdata"
|
||||
|
||||
|
||||
def _valid_socks_auth(data: modes.Socks5AuthData):
|
||||
data.valid = True
|
||||
|
||||
|
||||
def test_socks5_trickle(tctx: Context):
|
||||
ProxyAuth().load(tctx.options)
|
||||
tctx.options.proxyauth = "user:password"
|
||||
tctx.options.connection_strategy = "lazy"
|
||||
playbook = Playbook(modes.Socks5Proxy(tctx))
|
||||
for x in CLIENT_HELLO:
|
||||
for x in b"\x05\x01\x02":
|
||||
playbook >> DataReceived(tctx.client, bytes([x]))
|
||||
playbook << SendData(tctx.client, b"\x05\x00")
|
||||
playbook << SendData(tctx.client, b"\x05\x02")
|
||||
for x in b"\x01\x04user\x08password":
|
||||
playbook >> DataReceived(tctx.client, bytes([x]))
|
||||
playbook << modes.Socks5AuthHook(Placeholder())
|
||||
playbook >> reply(side_effect=_valid_socks_auth)
|
||||
playbook << SendData(tctx.client, b"\x01\x00")
|
||||
for x in b"\x05\x01\x00\x01\x7f\x00\x00\x01\x12\x34":
|
||||
playbook >> DataReceived(tctx.client, bytes([x]))
|
||||
assert playbook << SendData(tctx.client, b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
@ -334,9 +346,6 @@ def test_socks5_trickle(tctx: Context):
|
||||
(b"abcd",
|
||||
None,
|
||||
"Invalid SOCKS version. Expected 0x05, got 0x61"),
|
||||
(b"\x05\x01\x02",
|
||||
b"\x05\xFF\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
"mitmproxy only supports SOCKS without authentication"),
|
||||
(CLIENT_HELLO + b"\x05\x02\x00\x01\x7f\x00\x00\x01\x12\x34",
|
||||
SERVER_HELLO + b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
r"Unsupported SOCKS5 request: b'\x05\x02\x00\x01\x7f\x00\x00\x01\x124'"),
|
||||
@ -356,6 +365,79 @@ def test_socks5_err(data: bytes, err: bytes, msg: str, tctx: Context):
|
||||
assert playbook
|
||||
|
||||
|
||||
@pytest.mark.parametrize("client_greeting,server_choice,client_auth,server_resp,address,packed", [
|
||||
(b"\x05\x01\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04user\x08password",
|
||||
b"\x01\x00",
|
||||
"127.0.0.1",
|
||||
b"\x01\x7f\x00\x00\x01"),
|
||||
(b"\x05\x02\x01\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04user\x08password",
|
||||
b"\x01\x00",
|
||||
"127.0.0.1",
|
||||
b"\x01\x7f\x00\x00\x01"),
|
||||
])
|
||||
def test_socks5_auth_success(client_greeting: bytes, server_choice: bytes, client_auth: bytes, server_resp: bytes,
|
||||
address: bytes, packed: bytes, tctx: Context):
|
||||
ProxyAuth().load(tctx.options)
|
||||
tctx.options.proxyauth = "user:password"
|
||||
server = Placeholder(Server)
|
||||
nextlayer = Placeholder(NextLayer)
|
||||
playbook = (
|
||||
Playbook(modes.Socks5Proxy(tctx), logs=True)
|
||||
>> DataReceived(tctx.client, client_greeting)
|
||||
<< SendData(tctx.client, server_choice)
|
||||
>> DataReceived(tctx.client, client_auth)
|
||||
<< modes.Socks5AuthHook(Placeholder(modes.Socks5AuthData))
|
||||
>> reply(side_effect=_valid_socks_auth)
|
||||
<< SendData(tctx.client, server_resp)
|
||||
>> DataReceived(tctx.client, b"\x05\x01\x00" + packed + b"\x12\x34applicationdata")
|
||||
<< OpenConnection(server)
|
||||
>> reply(None)
|
||||
<< SendData(tctx.client, b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
<< NextLayerHook(nextlayer)
|
||||
)
|
||||
assert playbook
|
||||
assert server().address == (address, 0x1234)
|
||||
assert nextlayer().data_client() == b"applicationdata"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("client_greeting,server_choice,client_auth,err,msg", [
|
||||
(b"\x05\x01\x00",
|
||||
None,
|
||||
None,
|
||||
b"\x05\xFF\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
"Client does not support SOCKS5 with user/password authentication."),
|
||||
(b"\x05\x02\x00\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04" + b"user" + b"\x07" + b"errcode",
|
||||
b"\x01\x01",
|
||||
"authentication failed"),
|
||||
])
|
||||
def test_socks5_auth_fail(client_greeting: bytes, server_choice: bytes, client_auth: bytes, err: bytes, msg: str,
|
||||
tctx: Context):
|
||||
ProxyAuth().load(tctx.options)
|
||||
tctx.options.proxyauth = "user:password"
|
||||
playbook = (
|
||||
Playbook(modes.Socks5Proxy(tctx), logs=True)
|
||||
>> DataReceived(tctx.client, client_greeting)
|
||||
)
|
||||
if server_choice is None:
|
||||
playbook << SendData(tctx.client, err)
|
||||
else:
|
||||
playbook << SendData(tctx.client, server_choice)
|
||||
playbook >> DataReceived(tctx.client, client_auth)
|
||||
playbook << modes.Socks5AuthHook(Placeholder(modes.Socks5AuthData))
|
||||
playbook >> reply()
|
||||
playbook << SendData(tctx.client, err)
|
||||
|
||||
playbook << CloseConnection(tctx.client)
|
||||
playbook << Log(msg)
|
||||
assert playbook
|
||||
|
||||
|
||||
def test_socks5_eager_err(tctx: Context):
|
||||
tctx.options.connection_strategy = "eager"
|
||||
server = Placeholder(Server)
|
||||
|
Loading…
Reference in New Issue
Block a user