socks module: polish, add tests

This commit is contained in:
Maximilian Hils 2014-06-25 20:31:28 +02:00
parent dc3d3e5f0a
commit 6405595ae8
2 changed files with 83 additions and 43 deletions

View File

@ -1,7 +1,7 @@
import socket
import struct
from array import array
from .tcp import Address
from . import tcp, utils
class SocksError(Exception):
@ -9,6 +9,7 @@ class SocksError(Exception):
super(SocksError, self).__init__(message)
self.code = code
class VERSION:
SOCKS4 = 0x04
SOCKS5 = 0x05
@ -25,6 +26,7 @@ class ATYP:
DOMAINNAME = 0x03
IPV6_ADDRESS = 0x04
class REP:
SUCCEEDED = 0x00
GENERAL_SOCKS_SERVER_FAILURE = 0x01
@ -36,6 +38,7 @@ class REP:
COMMAND_NOT_SUPPORTED = 0x07
ADDRESS_TYPE_NOT_SUPPORTED = 0x08
class METHOD:
NO_AUTHENTICATION_REQUIRED = 0x00
GSSAPI = 0x01
@ -52,15 +55,14 @@ class ClientGreeting(object):
@classmethod
def from_file(cls, f):
ver, nmethods = struct.unpack_from("!BB", f)
ver, nmethods = struct.unpack("!BB", f.read(2))
methods = array("B")
methods.fromfile(f, nmethods)
methods.fromstring(f.read(nmethods))
return cls(ver, methods)
def to_file(self, f):
struct.pack_into("!BB", f, 0, self.ver, len(self.methods))
self.methods.tofile(f)
f.write(struct.pack("!BB", self.ver, len(self.methods)))
f.write(self.methods.tostring())
class ServerGreeting(object):
__slots__ = ("ver", "method")
@ -71,72 +73,55 @@ class ServerGreeting(object):
@classmethod
def from_file(cls, f):
ver, method = struct.unpack_from("!BB", f)
ver, method = struct.unpack("!BB", f.read(2))
return cls(ver, method)
def to_file(self, f):
struct.pack_into("!BB", f, 0, self.ver, self.method)
f.write(struct.pack("!BB", self.ver, self.method))
class Message(object):
__slots__ = ("ver", "msg", "atyp", "addr")
class Request(object):
__slots__ = ("ver", "cmd", "atyp", "dst")
def __init__(self, ver, cmd, atyp, dst):
def __init__(self, ver, msg, atyp, addr):
self.ver = ver
self.cmd = cmd
self.msg = msg
self.atyp = atyp
self.dst = dst
self.addr = addr
@classmethod
def from_file(cls, f):
ver, cmd, rsv, atyp = struct.unpack_from("!BBBB", f)
ver, msg, rsv, atyp = struct.unpack("!BBBB", f.read(4))
if rsv != 0x00:
raise SocksError(REP.GENERAL_SOCKS_SERVER_FAILURE,
"Socks Request: Invalid reserved byte: %s" % rsv)
if atyp == ATYP.IPV4_ADDRESS:
host = socket.inet_ntoa(f.read(4)) # We use tnoa here as ntop is not commonly available on Windows.
host = utils.inet_ntop(socket.AF_INET, f.read(4)) # We use tnoa here as ntop is not commonly available on Windows.
use_ipv6 = False
elif atyp == ATYP.IPV6_ADDRESS:
host = socket.inet_ntop(socket.AF_INET6, f.read(16))
host = utils.inet_ntop(socket.AF_INET6, f.read(16))
use_ipv6 = True
elif atyp == ATYP.DOMAINNAME:
length = struct.unpack_from("!B", f)
length, = struct.unpack("!B", f.read(1))
host = f.read(length)
use_ipv6 = False
else:
raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED,
"Socks Request: Unknown ATYP: %s" % atyp)
port = struct.unpack_from("!H", f)
dst = Address(host, port, use_ipv6=use_ipv6)
return Request(ver, cmd, atyp, dst)
port, = struct.unpack("!H", f.read(2))
addr = tcp.Address((host, port), use_ipv6=use_ipv6)
return cls(ver, msg, atyp, addr)
def to_file(self, f):
raise NotImplementedError()
class Reply(object):
__slots__ = ("ver", "rep", "atyp", "bnd")
def __init__(self, ver, rep, atyp, bnd):
self.ver = ver
self.rep = rep
self.atyp = atyp
self.bnd = bnd
@classmethod
def from_file(cls, f):
raise NotImplementedError()
def to_file(self, f):
struct.pack_into("!BBBB", f, 0, self.ver, self.rep, 0x00, self.atyp)
f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp))
if self.atyp == ATYP.IPV4_ADDRESS:
f.write(socket.inet_aton(self.bnd.host))
f.write(utils.inet_pton(socket.AF_INET, self.addr.host))
elif self.atyp == ATYP.IPV6_ADDRESS:
f.write(socket.inet_pton(socket.AF_INET6, self.bnd.host))
f.write(utils.inet_pton(socket.AF_INET6, self.addr.host))
elif self.atyp == ATYP.DOMAINNAME:
struct.pack_into("!B", f, 0, len(self.bnd.host))
f.write(self.bnd.host)
f.write(struct.pack("!B", len(self.addr.host)))
f.write(self.addr.host)
else:
raise SocksError(REP.ADDRESS_TYPE_NOT_SUPPORTED, "Unknown ATYP: %s" % self.atyp)
struct.pack_into("!H", f, 0, self.bnd.port)
f.write(struct.pack("!H", self.addr.port))

55
test/test_socks.py Normal file
View File

@ -0,0 +1,55 @@
from cStringIO import StringIO
import socket
from netlib import socks, utils
import tutils
def test_client_greeting():
raw = StringIO("\x05\x02\x00\xBE\xEF")
out = StringIO()
msg = socks.ClientGreeting.from_file(raw)
msg.to_file(out)
assert out.getvalue() == raw.getvalue()[:-1]
assert msg.ver == 5
assert len(msg.methods) == 2
assert 0xBE in msg.methods
assert 0xEF not in msg.methods
def test_server_greeting():
raw = StringIO("\x05\x02")
out = StringIO()
msg = socks.ServerGreeting.from_file(raw)
msg.to_file(out)
assert out.getvalue() == raw.getvalue()
assert msg.ver == 5
assert msg.method == 0x02
def test_message():
raw = StringIO("\x05\x01\x00\x03\x0bexample.com\xDE\xAD\xBE\xEF")
out = StringIO()
msg = socks.Message.from_file(raw)
assert raw.read(2) == "\xBE\xEF"
msg.to_file(out)
assert out.getvalue() == raw.getvalue()[:-2]
assert msg.ver == 5
assert msg.msg == 0x01
assert msg.atyp == 0x03
assert msg.addr == ("example.com", 0xDEAD)
# Test ATYP=0x01 (IPV4)
raw = StringIO("\x05\x01\x00\x01\x7f\x00\x00\x01\xDE\xAD\xBE\xEF")
msg = socks.Message.from_file(raw)
assert raw.read(2) == "\xBE\xEF"
assert msg.addr == ("127.0.0.1", 0xDEAD)
# Test ATYP=0x04 (IPV6)
ipv6_addr = "2001:0db8:85a3:08d3:1319:8a2e:0370:7344"
raw = StringIO("\x05\x01\x00\x04" + utils.inet_pton(socket.AF_INET6, ipv6_addr) + "\xDE\xAD\xBE\xEF")
msg = socks.Message.from_file(raw)
assert raw.read(2) == "\xBE\xEF"
assert msg.addr.host == ipv6_addr