mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-29 02:57:19 +00:00
feat: add test script for socks5auth
This commit is contained in:
parent
5876e431ce
commit
0c366f6436
@ -97,11 +97,12 @@ class Socks5Proxy(DestinationKnown):
|
||||
self,
|
||||
message: str,
|
||||
reply_code: Optional[int] = None,
|
||||
ver_code: int = SOCKS5_VERSION
|
||||
) -> layer.CommandGenerator[None]:
|
||||
if reply_code is not None:
|
||||
yield commands.SendData(
|
||||
self.context.client,
|
||||
bytes([SOCKS5_VERSION, reply_code]) + b"\x00\x01\x00\x00\x00\x00\x00\x00"
|
||||
bytes([ver_code, reply_code]) + b"\x00\x01\x00\x00\x00\x00\x00\x00"
|
||||
)
|
||||
yield commands.CloseConnection(self.context.client)
|
||||
yield commands.Log(message)
|
||||
@ -176,7 +177,7 @@ class Socks5Proxy(DestinationKnown):
|
||||
if self.check_auth(test_id, test_pw):
|
||||
yield commands.SendData(self.context.client, b"\x01\x00")
|
||||
else:
|
||||
yield from self.socks_err("authentication failed", 0x01)
|
||||
yield from self.socks_err("authentication failed", 0x01, ver_code=0x01)
|
||||
return
|
||||
self.buf = self.buf[3 + id_len + pw_len:]
|
||||
self.need_auth = False
|
||||
|
@ -334,9 +334,6 @@ def test_socks5_trickle(tctx: Context):
|
||||
(b"abcd",
|
||||
None,
|
||||
"Invalid SOCKS version. Expected 0x05, got 0x61"),
|
||||
(b"\x05\x01\x02",
|
||||
b"\x05\xFF\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
"mitmproxy only supports SOCKS without authentication"),
|
||||
(CLIENT_HELLO + b"\x05\x02\x00\x01\x7f\x00\x00\x01\x12\x34",
|
||||
SERVER_HELLO + b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
r"Unsupported SOCKS5 request: b'\x05\x02\x00\x01\x7f\x00\x00\x01\x124'"),
|
||||
@ -356,6 +353,73 @@ def test_socks5_err(data: bytes, err: bytes, msg: str, tctx: Context):
|
||||
assert playbook
|
||||
|
||||
|
||||
@pytest.mark.parametrize("client_greeting,server_choice,client_auth,server_resp,address,packed", [
|
||||
(b"\x05\x01\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04" + b"user" + b"\x08" + b"password",
|
||||
b"\x01\x00",
|
||||
"127.0.0.1",
|
||||
b"\x01\x7f\x00\x00\x01"),
|
||||
(b"\x05\x02\x01\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04" + b"user" + b"\x08" + b"password",
|
||||
b"\x01\x00",
|
||||
"127.0.0.1",
|
||||
b"\x01\x7f\x00\x00\x01"),
|
||||
])
|
||||
def test_socks5_auth_success(client_greeting: bytes, server_choice: bytes, client_auth: bytes, server_resp: bytes,
|
||||
address: bytes, packed: bytes, tctx: Context):
|
||||
tctx.options.socks5auth = "user:password"
|
||||
server = Placeholder(Server)
|
||||
nextlayer = Placeholder(NextLayer)
|
||||
playbook = (
|
||||
Playbook(modes.Socks5Proxy(tctx), logs=True)
|
||||
>> DataReceived(tctx.client, client_greeting)
|
||||
<< SendData(tctx.client, server_choice)
|
||||
>> DataReceived(tctx.client, client_auth)
|
||||
<< SendData(tctx.client, server_resp)
|
||||
>> DataReceived(tctx.client, b"\x05\x01\x00" + packed + b"\x12\x34applicationdata")
|
||||
<< OpenConnection(server)
|
||||
>> reply(None)
|
||||
<< SendData(tctx.client, b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00")
|
||||
<< NextLayerHook(nextlayer)
|
||||
)
|
||||
assert playbook
|
||||
assert server().address == (address, 0x1234)
|
||||
assert nextlayer().data_client() == b"applicationdata"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("client_greeting,server_choice,client_auth,err,msg", [
|
||||
(b"\x05\x01\x00",
|
||||
None,
|
||||
None,
|
||||
b"\x05\xFF\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
"mitmproxy only support user password authentication"),
|
||||
(b"\x05\x02\x00\x02",
|
||||
b"\x05\x02",
|
||||
b"\x01\x04" + b"user" + b"\x07" + b"errcode",
|
||||
b"\x01\x01\x00\x01\x00\x00\x00\x00\x00\x00",
|
||||
"authentication failed"),
|
||||
])
|
||||
def test_socks5_auth_fail(client_greeting: bytes, server_choice: bytes, client_auth: bytes, err: bytes, msg: str,
|
||||
tctx: Context):
|
||||
tctx.options.socks5auth = "user:password"
|
||||
playbook = (
|
||||
Playbook(modes.Socks5Proxy(tctx), logs=True)
|
||||
>> DataReceived(tctx.client, client_greeting)
|
||||
)
|
||||
if server_choice is None:
|
||||
playbook << SendData(tctx.client, err)
|
||||
else:
|
||||
playbook << SendData(tctx.client, server_choice)
|
||||
playbook >> DataReceived(tctx.client, client_auth)
|
||||
playbook << SendData(tctx.client, err)
|
||||
|
||||
playbook << CloseConnection(tctx.client)
|
||||
playbook << Log(msg)
|
||||
assert playbook
|
||||
|
||||
|
||||
def test_socks5_eager_err(tctx: Context):
|
||||
tctx.options.connection_strategy = "eager"
|
||||
server = Placeholder(Server)
|
||||
|
Loading…
Reference in New Issue
Block a user