Increase net.tcp.py coverage (#2336)

This commit is contained in:
Ujjwal Verma 2017-05-17 20:57:57 +05:30 committed by Thomas Kriechbaumer
parent feff5bd138
commit 5833b218b2
6 changed files with 56 additions and 18 deletions

View File

@ -18,13 +18,13 @@ matrix:
language: generic
env: TOXENV=py35 BDIST=1
- python: 3.5
env: TOXENV=py35 OPENSSL_OLD
env: TOXENV=py35 OPENSSL=old
addons:
apt:
packages:
- libssl-dev
- python: 3.5
env: TOXENV=py35 BDIST=1 OPENSSL_ALPN
env: TOXENV=py35 BDIST=1 OPENSSL=with-alpn
addons:
apt:
sources:
@ -34,7 +34,7 @@ matrix:
packages:
- libssl-dev
- python: 3.6
env: TOXENV=py36 OPENSSL_ALPN
env: TOXENV=py36 OPENSSL=with-alpn
addons:
apt:
sources:

View File

@ -511,7 +511,7 @@ class _Connection:
if log_ssl_key:
context.set_info_callback(log_ssl_key)
if HAS_ALPN:
if HAS_ALPN: # pragma: openssl-old no cover
if alpn_protos is not None:
# advertise application layer protocols
context.set_alpn_protos(alpn_protos)
@ -671,11 +671,11 @@ class TCPClient(_Connection):
if self.spoof_source_address:
try:
if not sock.getsockopt(socket.SOL_IP, socket.IP_TRANSPARENT):
sock.setsockopt(socket.SOL_IP, socket.IP_TRANSPARENT, 1)
sock.setsockopt(socket.SOL_IP, socket.IP_TRANSPARENT, 1) # pragma: windows no cover pragma: osx no cover
except Exception as e:
# socket.IP_TRANSPARENT might not be available on every OS and Python version
raise exceptions.TcpException(
"Failed to spoof the source address: " + e.strerror
"Failed to spoof the source address: " + str(e)
)
sock.connect(sa)
return sock
@ -688,7 +688,7 @@ class TCPClient(_Connection):
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")
raise socket.error("getaddrinfo returns an empty list") # pragma: no cover
def connect(self):
try:
@ -711,7 +711,7 @@ class TCPClient(_Connection):
return self.connection.gettimeout()
def get_alpn_proto_negotiated(self):
if HAS_ALPN and self.ssl_established:
if HAS_ALPN and self.ssl_established: # pragma: openssl-old no cover
return self.connection.get_alpn_proto_negotiated()
else:
return b""
@ -818,7 +818,7 @@ class BaseHandler(_Connection):
self.connection.settimeout(n)
def get_alpn_proto_negotiated(self):
if HAS_ALPN and self.ssl_established:
if HAS_ALPN and self.ssl_established: # pragma: openssl-old no cover
return self.connection.get_alpn_proto_negotiated()
else:
return b""
@ -852,7 +852,7 @@ class TCPServer:
self.__is_shut_down.set()
self.__shutdown_request = False
if self.address == 'localhost':
if self.address[0] == 'localhost':
raise socket.error("Binding to 'localhost' is prohibited. Please use '::1' or '127.0.0.1' directly.")
try:

View File

@ -22,8 +22,6 @@ exclude_lines =
[tool:full_coverage]
exclude =
mitmproxy/contentviews/wbxml.py
mitmproxy/net/tcp.py
mitmproxy/net/http/encoding.py
mitmproxy/proxy/protocol/
mitmproxy/proxy/config.py
mitmproxy/proxy/root_context.py

View File

@ -1,6 +1,7 @@
import os
import configparser
import pytest
import sys
here = os.path.abspath(os.path.dirname(__file__))
@ -59,6 +60,12 @@ def pytest_runtestloop(session):
if os.name == 'nt':
cov.exclude('pragma: windows no cover')
if sys.platform == 'darwin':
cov.exclude('pragma: osx no cover')
if os.environ.get("OPENSSL") == "old":
cov.exclude('pragma: openssl-old no cover')
yield
coverage_values = dict([(name, 0) for name in pytest.config.option.full_cov])

View File

@ -529,10 +529,10 @@ class TestTimeOut(tservers.ServerTestBase):
class TestCryptographyALPN:
def test_has_alpn(self):
if 'OPENSSL_ALPN' in os.environ:
if os.environ.get("OPENSSL") == "with-alpn":
assert tcp.HAS_ALPN
assert SSL._lib.Cryptography_HAS_ALPN
elif 'OPENSSL_OLD' in os.environ:
elif os.environ.get("OPENSSL") == "old":
assert not tcp.HAS_ALPN
assert not SSL._lib.Cryptography_HAS_ALPN
@ -603,13 +603,36 @@ class TestDHParams(tservers.ServerTestBase):
assert ret[0] == "DHE-RSA-AES256-SHA"
class TestTCPClient:
class TestTCPClient(tservers.ServerTestBase):
def test_conerr(self):
c = tcp.TCPClient(("127.0.0.1", 0))
with pytest.raises(exceptions.TcpException):
with pytest.raises(exceptions.TcpException, match="Error connecting"):
c.connect()
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.create_connection(timeout=20) as conn:
assert conn.gettimeout() == 20
def test_spoof_address(self):
c = tcp.TCPClient(("127.0.0.1", self.port), spoof_source_address=("127.0.0.1", 0))
with pytest.raises(exceptions.TcpException, match="Failed to spoof"):
c.connect()
class TestTCPServer:
def test_binderr(self):
with pytest.raises(socket.error, match="prohibited"):
tcp.TCPServer(("localhost", 8080))
def test_wait_for_silence(self):
s = tcp.TCPServer(("127.0.0.1", 0))
with s.handler_counter:
with pytest.raises(exceptions.Timeout):
s.wait_for_silence()
class TestFileLike:
@ -811,7 +834,7 @@ class TestSSLKeyLogger(tservers.ServerTestBase):
assert not tcp.SSLKeyLogger.create_logfun(False)
class TestSSLInvalidMethod(tservers.ServerTestBase):
class TestSSLInvalid(tservers.ServerTestBase):
handler = EchoHandler
ssl = True
@ -821,3 +844,13 @@ class TestSSLInvalidMethod(tservers.ServerTestBase):
with c.connect():
with pytest.raises(exceptions.TlsException):
c.convert_to_ssl(method=fake_ssl_method)
def test_alpn_error(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
if tcp.HAS_ALPN:
with pytest.raises(exceptions.TlsException, match="must be a function"):
c.create_ssl_context(alpn_select_callback="foo")
with pytest.raises(exceptions.TlsException, match="ALPN error"):
c.create_ssl_context(alpn_select="foo", alpn_select_callback="bar")

View File

@ -7,7 +7,7 @@ toxworkdir={env:TOX_WORK_DIR:.tox}
deps =
{env:CI_DEPS:}
-rrequirements.txt
passenv = CODECOV_TOKEN CI CI_* TRAVIS TRAVIS_* APPVEYOR APPVEYOR_* SNAPSHOT_* OPENSSL_* RTOOL_*
passenv = CODECOV_TOKEN CI CI_* TRAVIS TRAVIS_* APPVEYOR APPVEYOR_* SNAPSHOT_* OPENSSL RTOOL_*
setenv = HOME = {envtmpdir}
commands =
mitmdump --version