mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-27 02:24:18 +00:00
remove ntop windows workaround
This commit is contained in:
parent
6405595ae8
commit
e69133f98c
@ -96,10 +96,10 @@ class Message(object):
|
|||||||
"Socks Request: Invalid reserved byte: %s" % rsv)
|
"Socks Request: Invalid reserved byte: %s" % rsv)
|
||||||
|
|
||||||
if atyp == ATYP.IPV4_ADDRESS:
|
if atyp == ATYP.IPV4_ADDRESS:
|
||||||
host = utils.inet_ntop(socket.AF_INET, f.read(4)) # We use tnoa here as ntop is not commonly available on Windows.
|
host = socket.inet_ntoa(f.read(4)) # We use tnoa here as ntop is not commonly available on Windows.
|
||||||
use_ipv6 = False
|
use_ipv6 = False
|
||||||
elif atyp == ATYP.IPV6_ADDRESS:
|
elif atyp == ATYP.IPV6_ADDRESS:
|
||||||
host = utils.inet_ntop(socket.AF_INET6, f.read(16))
|
host = socket.inet_ntop(socket.AF_INET6, f.read(16))
|
||||||
use_ipv6 = True
|
use_ipv6 = True
|
||||||
elif atyp == ATYP.DOMAINNAME:
|
elif atyp == ATYP.DOMAINNAME:
|
||||||
length, = struct.unpack("!B", f.read(1))
|
length, = struct.unpack("!B", f.read(1))
|
||||||
@ -116,9 +116,9 @@ class Message(object):
|
|||||||
def to_file(self, f):
|
def to_file(self, f):
|
||||||
f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp))
|
f.write(struct.pack("!BBBB", self.ver, self.msg, 0x00, self.atyp))
|
||||||
if self.atyp == ATYP.IPV4_ADDRESS:
|
if self.atyp == ATYP.IPV4_ADDRESS:
|
||||||
f.write(utils.inet_pton(socket.AF_INET, self.addr.host))
|
f.write(socket.inet_aton(self.addr.host))
|
||||||
elif self.atyp == ATYP.IPV6_ADDRESS:
|
elif self.atyp == ATYP.IPV6_ADDRESS:
|
||||||
f.write(utils.inet_pton(socket.AF_INET6, self.addr.host))
|
f.write(socket.inet_pton(socket.AF_INET6, self.addr.host))
|
||||||
elif self.atyp == ATYP.DOMAINNAME:
|
elif self.atyp == ATYP.DOMAINNAME:
|
||||||
f.write(struct.pack("!B", len(self.addr.host)))
|
f.write(struct.pack("!B", len(self.addr.host)))
|
||||||
f.write(self.addr.host)
|
f.write(self.addr.host)
|
||||||
|
@ -44,24 +44,3 @@ def hexdump(s):
|
|||||||
(o, x, cleanBin(part, True))
|
(o, x, cleanBin(part, True))
|
||||||
)
|
)
|
||||||
return parts
|
return parts
|
||||||
|
|
||||||
|
|
||||||
def inet_ntop(address_family, packed_ip):
|
|
||||||
if hasattr(socket, "inet_ntop"):
|
|
||||||
return socket.inet_ntop(address_family, packed_ip)
|
|
||||||
# Windows Fallbacks
|
|
||||||
if address_family == socket.AF_INET:
|
|
||||||
return socket.inet_ntoa(packed_ip)
|
|
||||||
if address_family == socket.AF_INET6:
|
|
||||||
ip = packed_ip.encode("hex")
|
|
||||||
return ":".join([ip[i:i + 4] for i in range(0, len(ip), 4)])
|
|
||||||
|
|
||||||
|
|
||||||
def inet_pton(address_family, ip_string):
|
|
||||||
if hasattr(socket, "inet_pton"):
|
|
||||||
return socket.inet_pton(address_family, ip_string)
|
|
||||||
# Windows Fallbacks
|
|
||||||
if address_family == socket.AF_INET:
|
|
||||||
return socket.inet_aton(ip_string)
|
|
||||||
if address_family == socket.AF_INET6:
|
|
||||||
return ip_string.replace(":", "").decode("hex")
|
|
@ -1,5 +1,6 @@
|
|||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
import socket
|
import socket
|
||||||
|
from nose.plugins.skip import SkipTest
|
||||||
from netlib import socks, utils
|
from netlib import socks, utils
|
||||||
import tutils
|
import tutils
|
||||||
|
|
||||||
@ -47,9 +48,13 @@ def test_message():
|
|||||||
assert raw.read(2) == "\xBE\xEF"
|
assert raw.read(2) == "\xBE\xEF"
|
||||||
assert msg.addr == ("127.0.0.1", 0xDEAD)
|
assert msg.addr == ("127.0.0.1", 0xDEAD)
|
||||||
|
|
||||||
|
|
||||||
|
def test_message_ipv6():
|
||||||
|
if not hasattr(socket, "inet_ntop"):
|
||||||
|
raise SkipTest("Skipped because inet_ntop is not available")
|
||||||
# Test ATYP=0x04 (IPV6)
|
# Test ATYP=0x04 (IPV6)
|
||||||
ipv6_addr = "2001:0db8:85a3:08d3:1319:8a2e:0370:7344"
|
ipv6_addr = "2001:0db8:85a3:08d3:1319:8a2e:0370:7344"
|
||||||
raw = StringIO("\x05\x01\x00\x04" + utils.inet_pton(socket.AF_INET6, ipv6_addr) + "\xDE\xAD\xBE\xEF")
|
raw = StringIO("\x05\x01\x00\x04" + socket.inet_pton(socket.AF_INET6, ipv6_addr) + "\xDE\xAD\xBE\xEF")
|
||||||
msg = socks.Message.from_file(raw)
|
msg = socks.Message.from_file(raw)
|
||||||
assert raw.read(2) == "\xBE\xEF"
|
assert raw.read(2) == "\xBE\xEF"
|
||||||
assert msg.addr.host == ipv6_addr
|
assert msg.addr.host == ipv6_addr
|
@ -10,21 +10,3 @@ def test_cleanBin():
|
|||||||
assert utils.cleanBin("\00ne") == ".ne"
|
assert utils.cleanBin("\00ne") == ".ne"
|
||||||
assert utils.cleanBin("\nne") == "\nne"
|
assert utils.cleanBin("\nne") == "\nne"
|
||||||
assert utils.cleanBin("\nne", True) == ".ne"
|
assert utils.cleanBin("\nne", True) == ".ne"
|
||||||
|
|
||||||
def test_ntop_pton():
|
|
||||||
for family, ip_string, packed_ip in (
|
|
||||||
(socket.AF_INET,
|
|
||||||
"127.0.0.1",
|
|
||||||
"\x7f\x00\x00\x01"),
|
|
||||||
(socket.AF_INET6,
|
|
||||||
"2001:0db8:85a3:08d3:1319:8a2e:0370:7344",
|
|
||||||
" \x01\r\xb8\x85\xa3\x08\xd3\x13\x19\x8a.\x03psD")):
|
|
||||||
assert ip_string == utils.inet_ntop(family, packed_ip)
|
|
||||||
assert packed_ip == utils.inet_pton(family, ip_string)
|
|
||||||
if hasattr(socket, "inet_ntop"):
|
|
||||||
ntop, pton = socket.inet_ntop, socket.inet_pton
|
|
||||||
delattr(socket,"inet_ntop")
|
|
||||||
delattr(socket,"inet_pton")
|
|
||||||
assert ip_string == utils.inet_ntop(family, packed_ip)
|
|
||||||
assert packed_ip == utils.inet_pton(family, ip_string)
|
|
||||||
socket.inet_ntop, socket.inet_pton = ntop, pton
|
|
Loading…
Reference in New Issue
Block a user