Add 'UsernamePasswordAuth' 'UsernamePasswordAuthResponse' to SOCKS

This commit is contained in:
yonder 2016-03-15 14:33:20 +08:00
parent 0e27dfd9c1
commit ba933dff2c
2 changed files with 96 additions and 1 deletions

View File

@ -10,7 +10,6 @@ class SocksError(Exception):
super(SocksError, self).__init__(message)
self.code = code
VERSION = utils.BiDi(
SOCKS4=0x04,
SOCKS5=0x05
@ -47,6 +46,10 @@ METHOD = utils.BiDi(
NO_ACCEPTABLE_METHODS=0xFF
)
USERNAME_PASSWORD_VERSION = utils.BiDi(
DEFAULT=0x01
)
class ClientGreeting(object):
__slots__ = ("ver", "methods")
@ -113,6 +116,59 @@ class ServerGreeting(object):
f.write(struct.pack("!BB", self.ver, self.method))
class UsernamePasswordAuth(object):
__slots__ = ("ver", "username", "password")
def __init__(self, ver, username, password):
self.ver = ver
self.username = username
self.password = password
def assert_authver1(self):
if self.ver != USERNAME_PASSWORD_VERSION.DEFAULT:
raise SocksError(
0,
"Invalid auth version. Expected 0x01, got 0x%x" % self.ver
)
@classmethod
def from_file(cls, f):
ver, ulen = struct.unpack("!BB", f.safe_read(2))
username = f.safe_read(ulen)
plen, = struct.unpack("!B", f.safe_read(1))
password = f.safe_read(plen)
return cls(ver, username.decode(), password.decode())
def to_file(self, f):
f.write(struct.pack("!BB", self.ver, len(self.username)))
f.write(self.username.encode())
f.write(struct.pack("!B", len(self.password)))
f.write(self.password.encode())
class UsernamePasswordAuthResponse(object):
__slots__ = ("ver", "status")
def __init__(self, ver, status):
self.ver = ver
self.status = status
def assert_authver1(self):
if self.ver != USERNAME_PASSWORD_VERSION.DEFAULT:
raise SocksError(
0,
"Invalid auth version. Expected 0x01, got 0x%x" % self.ver
)
@classmethod
def from_file(cls, f):
ver, status = struct.unpack("!BB", f.safe_read(2))
return cls(ver, status)
def to_file(self, f):
f.write(struct.pack("!BB", self.ver, self.status))
class Message(object):
__slots__ = ("ver", "msg", "atyp", "addr")

View File

@ -85,6 +85,45 @@ def test_server_greeting_assert_socks5():
assert False
def test_username_password_auth():
raw = tutils.treader(b"\x01\x03usr\x03psd\xBE\xEF")
out = BytesIO()
auth = socks.UsernamePasswordAuth.from_file(raw)
auth.assert_authver1()
assert raw.read(2) == b"\xBE\xEF"
auth.to_file(out)
assert out.getvalue() == raw.getvalue()[:-2]
assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT
assert auth.username == "usr"
assert auth.password == "psd"
def test_username_password_auth_assert_ver1():
raw = tutils.treader(b"\x02\x03usr\x03psd\xBE\xEF")
auth = socks.UsernamePasswordAuth.from_file(raw)
tutils.raises(socks.SocksError, auth.assert_authver1)
def test_username_password_auth_response():
raw = tutils.treader(b"\x01\x00\xBE\xEF")
out = BytesIO()
auth = socks.UsernamePasswordAuthResponse.from_file(raw)
auth.assert_authver1()
assert raw.read(2) == b"\xBE\xEF"
auth.to_file(out)
assert out.getvalue() == raw.getvalue()[:-2]
assert auth.ver == socks.USERNAME_PASSWORD_VERSION.DEFAULT
assert auth.status == 0
def test_username_password_auth_response_auth_assert_ver1():
raw = tutils.treader(b"\x02\x00\xBE\xEF")
auth = socks.UsernamePasswordAuthResponse.from_file(raw)
tutils.raises(socks.SocksError, auth.assert_authver1)
def test_message():
raw = tutils.treader(b"\x05\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF")
out = BytesIO()