mitmproxy/test/netlib/test_tcp.py

803 lines
23 KiB
Python
Raw Normal View History

2015-09-20 16:12:55 +00:00
from io import BytesIO
2016-10-17 03:38:31 +00:00
import queue
import time
import socket
import random
2015-02-27 21:02:52 +00:00
import os
2015-04-08 22:57:37 +00:00
import threading
import mock
from OpenSSL import SSL
2016-10-19 22:02:52 +00:00
from mitmproxy import certs
from netlib import tcp
from netlib import tutils
from netlib import exceptions
2012-06-18 21:42:32 +00:00
2016-02-16 20:31:07 +00:00
from . import tservers
2016-05-29 11:33:20 +00:00
class EchoHandler(tcp.BaseHandler):
2012-06-25 21:50:42 +00:00
sni = None
2012-06-25 21:50:42 +00:00
def handle_sni(self, connection):
self.sni = connection.get_servername()
2012-06-18 21:42:32 +00:00
def handle(self):
v = self.rfile.readline()
2013-01-27 06:21:18 +00:00
self.wfile.write(v)
2012-06-18 21:42:32 +00:00
self.wfile.flush()
class ClientCipherListHandler(tcp.BaseHandler):
sni = None
def handle(self):
self.wfile.write("%s" % self.connection.get_cipher_list())
self.wfile.flush()
2012-07-21 04:10:54 +00:00
class HangHandler(tcp.BaseHandler):
2012-07-21 04:10:54 +00:00
def handle(self):
# Hang as long as the client connection is alive
while True:
try:
self.connection.setblocking(0)
ret = self.connection.recv(1)
# Client connection is dead...
2016-06-14 04:13:50 +00:00
if ret == "" or ret == b"":
return
except socket.error:
pass
except SSL.WantReadError:
pass
except Exception:
return
time.sleep(0.1)
2012-07-21 04:10:54 +00:00
2015-06-15 15:31:08 +00:00
class ALPNHandler(tcp.BaseHandler):
sni = None
def handle(self):
alp = self.get_alpn_proto_negotiated()
if alp:
2015-09-20 18:35:45 +00:00
self.wfile.write(alp)
2015-06-15 15:31:08 +00:00
else:
2015-09-20 17:56:45 +00:00
self.wfile.write(b"NONE")
2015-06-15 15:31:08 +00:00
self.wfile.flush()
2015-06-22 02:52:23 +00:00
class TestServer(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = EchoHandler
2012-06-18 21:42:32 +00:00
def test_echo(self):
2015-09-20 18:35:45 +00:00
testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
2012-06-18 21:42:32 +00:00
2015-04-08 22:57:37 +00:00
def test_thread_start_error(self):
with mock.patch.object(threading.Thread, "start", side_effect=threading.ThreadError("nonewthread")) as m:
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
assert not c.rfile.read(1)
assert m.called
assert "nonewthread" in self.q.get_nowait()
2015-04-08 22:57:37 +00:00
self.test_echo()
2015-06-22 02:52:23 +00:00
class TestServerBind(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
2014-03-10 04:29:27 +00:00
def handle(self):
2015-09-20 18:35:45 +00:00
self.wfile.write(str(self.connection.getpeername()).encode())
2014-03-10 04:29:27 +00:00
self.wfile.flush()
def test_bind(self):
""" Test to bind to a given random port. Try again if the random port turned out to be blocked. """
for i in range(20):
random_port = random.randrange(1024, 65535)
try:
2015-05-30 00:02:58 +00:00
c = tcp.TCPClient(
("127.0.0.1", self.port), source_address=(
"127.0.0.1", random_port))
with c.connect():
assert c.rfile.readline() == str(("127.0.0.1", random_port)).encode()
return
2016-10-19 22:02:52 +00:00
except exceptions.TcpException: # port probably already in use
pass
2015-06-22 02:52:23 +00:00
class TestServerIPv6(tservers.ServerTestBase):
handler = EchoHandler
addr = tcp.Address(("localhost", 0), use_ipv6=True)
def test_echo(self):
2015-09-20 18:35:45 +00:00
testval = b"echo!\n"
c = tcp.TCPClient(tcp.Address(("::1", self.port), use_ipv6=True))
with c.connect():
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
2013-01-27 06:21:18 +00:00
2015-06-22 02:52:23 +00:00
class TestEcho(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = EchoHandler
def test_echo(self):
2015-09-20 18:35:45 +00:00
testval = b"echo!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
2014-03-10 16:43:39 +00:00
class HardDisconnectHandler(tcp.BaseHandler):
2014-03-10 16:43:39 +00:00
def handle(self):
self.connection.close()
2015-06-22 02:52:23 +00:00
class TestFinishFail(tservers.ServerTestBase):
2015-02-27 21:02:52 +00:00
"""
This tests a difficult-to-trigger exception in the .finish() method of
the handler.
"""
handler = EchoHandler
2015-02-27 21:02:52 +00:00
def test_disconnect_in_finish(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.wfile.write(b"foo\n")
2016-10-19 22:02:52 +00:00
c.wfile.flush = mock.Mock(side_effect=exceptions.TcpDisconnect)
c.finish()
2015-02-27 21:02:52 +00:00
2015-06-22 02:52:23 +00:00
class TestServerSSL(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = EchoHandler
ssl = dict(
cipher_list="AES256-SHA",
chain_file=tutils.test_data.path("data/server.crt")
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-07-07 04:03:17 +00:00
c.convert_to_ssl(sni="foo.com", options=SSL.OP_ALL)
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
2012-06-18 21:42:32 +00:00
def test_get_current_cipher(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
assert not c.get_current_cipher()
2016-07-07 04:03:17 +00:00
c.convert_to_ssl(sni="foo.com")
ret = c.get_current_cipher()
assert ret
assert "AES" in ret[0]
2012-06-18 21:42:32 +00:00
2015-06-22 02:52:23 +00:00
class TestSSLv3Only(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = EchoHandler
ssl = dict(
request_client_cert=False,
v3_only=True
2013-01-25 03:03:59 +00:00
)
def test_failure(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TlsException, c.convert_to_ssl, sni="foo.com")
class TestSSLUpstreamCertVerificationWBadServerCert(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/self-signed.crt"),
key=tutils.test_data.path("data/verificationcerts/self-signed.key")
2015-09-20 18:35:45 +00:00
)
def test_mode_default_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
# Verification errors should be saved even if connection isn't aborted
# aborted
2016-07-28 04:01:28 +00:00
assert c.ssl_verification_error
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
def test_mode_none_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(verify_options=SSL.VERIFY_NONE)
# Verification errors should be saved even if connection isn't aborted
2016-07-28 04:01:28 +00:00
assert c.ssl_verification_error
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
def test_mode_strict_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-10-19 22:02:52 +00:00
with tutils.raises(exceptions.InvalidCertificateException):
c.convert_to_ssl(
2016-07-07 04:03:17 +00:00
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
2016-07-28 04:01:28 +00:00
assert c.ssl_verification_error
# Unknown issuing certificate authority for first certificate
2016-07-28 04:01:28 +00:00
assert "errno: 18" in str(c.ssl_verification_error)
assert "depth: 0" in str(c.ssl_verification_error)
class TestSSLUpstreamCertVerificationWBadHostname(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key")
)
def test_should_fail_without_sni(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-10-19 22:02:52 +00:00
with tutils.raises(exceptions.TlsException):
c.convert_to_ssl(
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
def test_should_fail(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-10-19 22:02:52 +00:00
with tutils.raises(exceptions.InvalidCertificateException):
c.convert_to_ssl(
2016-07-07 04:03:17 +00:00
sni="mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
2016-07-28 04:01:28 +00:00
assert c.ssl_verification_error
class TestSSLUpstreamCertVerificationWValidCertChain(tservers.ServerTestBase):
handler = EchoHandler
ssl = dict(
cert=tutils.test_data.path("data/verificationcerts/trusted-leaf.crt"),
key=tutils.test_data.path("data/verificationcerts/trusted-leaf.key")
)
def test_mode_strict_w_pemfile_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
2016-07-07 04:03:17 +00:00
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_pemfile=tutils.test_data.path("data/verificationcerts/trusted-root.crt")
)
assert c.ssl_verification_error is None
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
def test_mode_strict_w_cadir_should_pass(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
2016-07-07 04:03:17 +00:00
sni="example.mitmproxy.org",
verify_options=SSL.VERIFY_PEER,
ca_path=tutils.test_data.path("data/verificationcerts/")
)
assert c.ssl_verification_error is None
testval = b"echo!\n"
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
2015-06-22 02:52:23 +00:00
class TestSSLClientCert(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
sni = None
2014-03-10 04:29:27 +00:00
def handle_sni(self, connection):
self.sni = connection.get_servername()
def handle(self):
2015-09-20 18:35:45 +00:00
self.wfile.write(b"%d\n" % self.clientcert.serial)
2014-03-10 04:29:27 +00:00
self.wfile.flush()
2013-01-25 03:03:59 +00:00
ssl = dict(
request_client_cert=True,
v3_only=False
2013-01-25 03:03:59 +00:00
)
def test_clientcert(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(
cert=tutils.test_data.path("data/clientcert/client.pem"))
assert c.rfile.readline().strip() == b"1"
def test_clientcert_err(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
tutils.raises(
2016-10-19 22:02:52 +00:00
exceptions.TlsException,
c.convert_to_ssl,
cert=tutils.test_data.path("data/clientcert/make")
)
2015-06-22 02:52:23 +00:00
class TestSNI(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
sni = None
2014-03-10 04:29:27 +00:00
def handle_sni(self, connection):
self.sni = connection.get_servername()
def handle(self):
self.wfile.write(self.sni)
self.wfile.flush()
ssl = True
2012-06-25 21:50:42 +00:00
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-07-07 04:03:17 +00:00
c.convert_to_ssl(sni="foo.com")
assert c.sni == "foo.com"
assert c.rfile.readline() == b"foo.com"
2012-06-25 21:50:42 +00:00
2015-06-22 02:52:23 +00:00
class TestServerCipherList(tservers.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
cipher_list='RC4-SHA'
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-07-07 04:03:17 +00:00
c.convert_to_ssl(sni="foo.com")
assert c.rfile.readline() == b"['RC4-SHA']"
2015-06-22 02:52:23 +00:00
class TestServerCurrentCipher(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
sni = None
2014-03-10 04:29:27 +00:00
def handle(self):
2015-09-20 18:35:45 +00:00
self.wfile.write(str(self.get_current_cipher()).encode())
2014-03-10 04:29:27 +00:00
self.wfile.flush()
ssl = dict(
cipher_list='RC4-SHA'
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-07-07 04:03:17 +00:00
c.convert_to_ssl(sni="foo.com")
assert b"RC4-SHA" in c.rfile.readline()
2015-06-22 02:52:23 +00:00
class TestServerCipherListError(tservers.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
cipher_list='bogus'
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
2016-07-07 04:03:17 +00:00
tutils.raises("handshake error", c.convert_to_ssl, sni="foo.com")
2015-06-22 02:52:23 +00:00
class TestClientCipherListError(tservers.ServerTestBase):
handler = ClientCipherListHandler
ssl = dict(
cipher_list='RC4-SHA'
)
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
tutils.raises(
"cipher specification",
c.convert_to_ssl,
2016-07-07 04:03:17 +00:00
sni="foo.com",
cipher_list="bogus"
)
2015-06-22 02:52:23 +00:00
class TestSSLDisconnect(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
2014-03-10 04:29:27 +00:00
def handle(self):
2014-10-23 13:31:42 +00:00
self.finish()
ssl = True
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
# Excercise SSL.ZeroReturnError
c.rfile.read(10)
c.close()
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo")
tutils.raises(queue.Empty, self.q.get_nowait)
2015-06-22 02:52:23 +00:00
class TestSSLHardDisconnect(tservers.ServerTestBase):
2014-03-10 16:43:39 +00:00
handler = HardDisconnectHandler
ssl = True
2014-03-10 16:43:39 +00:00
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
# Exercise SSL.SysCallError
c.rfile.read(10)
c.close()
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpDisconnect, c.wfile.write, b"foo")
2014-03-10 16:43:39 +00:00
2015-06-22 02:52:23 +00:00
class TestDisconnect(tservers.ServerTestBase):
def test_echo(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.rfile.read(10)
c.wfile.write(b"foo")
c.close()
c.close()
2015-06-22 02:52:23 +00:00
class TestServerTimeOut(tservers.ServerTestBase):
2014-03-10 04:29:27 +00:00
class handler(tcp.BaseHandler):
2014-03-10 04:29:27 +00:00
def handle(self):
self.timeout = False
self.settimeout(0.01)
try:
self.rfile.read(10)
2016-10-19 22:02:52 +00:00
except exceptions.TcpTimeout:
2014-03-10 04:29:27 +00:00
self.timeout = True
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
time.sleep(0.3)
assert self.last_handler.timeout
2015-06-22 02:52:23 +00:00
class TestTimeOut(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = HangHandler
def test_timeout(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
2016-06-14 02:34:30 +00:00
with c.connect():
c.settimeout(0.1)
assert c.gettimeout() == 0.1
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10)
2012-07-21 04:10:54 +00:00
2015-06-22 02:52:23 +00:00
class TestALPNClient(tservers.ServerTestBase):
2015-06-15 15:31:08 +00:00
handler = ALPNHandler
2015-05-28 15:46:44 +00:00
ssl = dict(
2015-09-20 18:35:45 +00:00
alpn_select=b"bar"
2015-05-28 15:46:44 +00:00
)
2016-02-15 17:43:06 +00:00
if tcp.HAS_ALPN:
def test_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
assert c.get_alpn_proto_negotiated() == b"bar"
assert c.rfile.readline().strip() == b"bar"
2015-05-28 15:46:44 +00:00
2015-06-12 13:21:23 +00:00
def test_no_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
assert c.get_alpn_proto_negotiated() == b""
assert c.rfile.readline().strip() == b"NONE"
2015-06-12 13:21:23 +00:00
else:
def test_none_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl(alpn_protos=[b"foo", b"bar", b"fasel"])
assert c.get_alpn_proto_negotiated() == b""
assert c.rfile.readline() == b"NONE"
2015-06-15 15:31:08 +00:00
2015-06-22 02:52:23 +00:00
class TestNoSSLNoALPNClient(tservers.ServerTestBase):
2015-06-15 15:31:08 +00:00
handler = ALPNHandler
def test_no_ssl_no_alpn(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
assert c.get_alpn_proto_negotiated() == b""
assert c.rfile.readline().strip() == b"NONE"
2015-05-28 15:46:44 +00:00
2015-06-22 02:52:23 +00:00
class TestSSLTimeOut(tservers.ServerTestBase):
2013-01-25 03:03:59 +00:00
handler = HangHandler
ssl = True
2012-07-21 04:10:54 +00:00
def test_timeout_client(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
c.settimeout(0.1)
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpTimeout, c.rfile.read, 10)
2012-07-21 04:10:54 +00:00
2015-06-22 02:52:23 +00:00
class TestDHParams(tservers.ServerTestBase):
2014-03-07 03:38:50 +00:00
handler = HangHandler
ssl = dict(
2016-10-19 22:02:52 +00:00
dhparams=certs.CertStore.load_dhparam(
2014-03-07 03:38:50 +00:00
tutils.test_data.path("data/dhparam.pem"),
),
cipher_list="DHE-RSA-AES256-SHA"
2014-03-07 03:38:50 +00:00
)
2014-03-07 03:38:50 +00:00
def test_dhparams(self):
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
ret = c.get_current_cipher()
assert ret[0] == "DHE-RSA-AES256-SHA"
2014-03-07 03:38:50 +00:00
2015-02-27 21:02:52 +00:00
def test_create_dhparams(self):
with tutils.tmpdir() as d:
filename = os.path.join(d, "dhparam.pem")
2016-10-19 22:02:52 +00:00
certs.CertStore.load_dhparam(filename)
2015-02-27 21:02:52 +00:00
assert os.path.exists(filename)
2014-03-07 03:38:50 +00:00
2014-03-10 04:29:27 +00:00
2012-06-18 21:42:32 +00:00
class TestTCPClient:
2012-06-18 21:42:32 +00:00
def test_conerr(self):
c = tcp.TCPClient(("127.0.0.1", 0))
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpException, c.connect)
2012-06-18 21:42:32 +00:00
class TestFileLike:
2012-10-09 03:25:15 +00:00
def test_blocksize(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"1234567890abcdefghijklmnopqrstuvwxyz")
2012-10-09 03:25:15 +00:00
s = tcp.Reader(s)
s.BLOCKSIZE = 2
2015-09-20 18:35:45 +00:00
assert s.read(1) == b"1"
assert s.read(2) == b"23"
assert s.read(3) == b"456"
assert s.read(4) == b"7890"
2012-10-09 03:25:15 +00:00
d = s.read(-1)
2015-09-20 18:35:45 +00:00
assert d.startswith(b"abc") and d.endswith(b"xyz")
2012-10-09 03:25:15 +00:00
2012-06-18 21:42:32 +00:00
def test_wrap(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
2012-06-18 21:42:32 +00:00
s.flush()
s = tcp.Reader(s)
2015-09-20 18:35:45 +00:00
assert s.readline() == b"foobar\n"
assert s.readline() == b"foobar"
2012-06-18 21:42:32 +00:00
# Test __getattr__
assert s.isatty
def test_limit(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
2015-09-20 18:35:45 +00:00
assert s.readline(3) == b"foo"
def test_limitless(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"f" * (50 * 1024))
s = tcp.Reader(s)
ret = s.read(-1)
assert len(ret) == 50 * 1024
def test_readlog(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
assert not s.is_logging()
s.start_log()
assert s.is_logging()
s.readline()
2015-09-20 18:35:45 +00:00
assert s.get_log() == b"foobar\n"
s.read(1)
2015-09-20 18:35:45 +00:00
assert s.get_log() == b"foobar\nf"
s.start_log()
2015-09-20 18:35:45 +00:00
assert s.get_log() == b""
s.read(1)
2015-09-20 18:35:45 +00:00
assert s.get_log() == b"o"
s.stop_log()
tutils.raises(ValueError, s.get_log)
def test_writelog(self):
2015-09-20 16:12:55 +00:00
s = BytesIO()
s = tcp.Writer(s)
s.start_log()
assert s.is_logging()
2015-09-20 18:35:45 +00:00
s.write(b"x")
assert s.get_log() == b"x"
s.write(b"x")
assert s.get_log() == b"xx"
def test_writer_flush_error(self):
2015-09-20 16:12:55 +00:00
s = BytesIO()
s = tcp.Writer(s)
o = mock.MagicMock()
o.flush = mock.MagicMock(side_effect=socket.error)
s.o = o
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpDisconnect, s.flush)
def test_reader_read_error(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
o = mock.MagicMock()
o.read = mock.MagicMock(side_effect=socket.error)
s.o = o
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpDisconnect, s.read, 10)
def test_reset_timestamps(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
s.first_byte_timestamp = 500
s.reset_timestamps()
assert not s.first_byte_timestamp
def test_first_byte_timestamp_updated_on_read(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar")
s = tcp.Reader(s)
s.read(1)
assert s.first_byte_timestamp
expected = s.first_byte_timestamp
s.read(5)
assert s.first_byte_timestamp == expected
def test_first_byte_timestamp_updated_on_readline(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar\nfoobar\nfoobar")
s = tcp.Reader(s)
s.readline()
assert s.first_byte_timestamp
expected = s.first_byte_timestamp
s.readline()
assert s.first_byte_timestamp == expected
2013-01-27 06:21:18 +00:00
def test_read_ssl_error(self):
s = mock.MagicMock()
s.read = mock.MagicMock(side_effect=SSL.Error())
s = tcp.Reader(s)
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TlsException, s.read, 1)
2015-02-27 21:02:52 +00:00
def test_read_syscall_ssl_error(self):
s = mock.MagicMock()
s.read = mock.MagicMock(side_effect=SSL.SysCallError())
s = tcp.Reader(s)
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TlsException, s.read, 1)
2015-02-27 21:02:52 +00:00
def test_reader_readline_disconnect(self):
o = mock.MagicMock()
o.read = mock.MagicMock(side_effect=socket.error)
s = tcp.Reader(o)
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpDisconnect, s.readline, 10)
2015-06-05 17:39:15 +00:00
def test_reader_incomplete_error(self):
2015-09-20 16:12:55 +00:00
s = BytesIO(b"foobar")
2015-06-05 17:39:15 +00:00
s = tcp.Reader(s)
2016-10-19 22:02:52 +00:00
tutils.raises(exceptions.TcpReadIncomplete, s.safe_read, 10)
2015-06-05 17:39:15 +00:00
2016-02-01 18:24:30 +00:00
class TestPeek(tservers.ServerTestBase):
handler = EchoHandler
2016-02-01 19:10:18 +00:00
def _connect(self, c):
return c.connect()
2016-02-01 19:10:18 +00:00
2016-02-01 18:24:30 +00:00
def test_peek(self):
testval = b"peek!\n"
c = tcp.TCPClient(("127.0.0.1", self.port))
with self._connect(c):
c.wfile.write(testval)
c.wfile.flush()
2016-02-01 18:24:30 +00:00
assert c.rfile.peek(4) == b"peek"
assert c.rfile.peek(6) == b"peek!\n"
assert c.rfile.readline() == testval
2016-02-01 19:10:18 +00:00
c.close()
2016-10-19 22:02:52 +00:00
with tutils.raises(exceptions.NetlibException):
if c.rfile.peek(1) == b"":
# Workaround for Python 2 on Unix:
# Peeking a closed connection does not raise an exception here.
2016-10-19 22:02:52 +00:00
raise exceptions.NetlibException()
2016-02-01 19:10:18 +00:00
class TestPeekSSL(TestPeek):
ssl = True
def _connect(self, c):
with c.connect() as conn:
c.convert_to_ssl()
return conn.pop()
2016-02-01 18:24:30 +00:00
class TestAddress:
def test_simple(self):
2016-02-18 23:30:37 +00:00
a = tcp.Address(("localhost", 80), True)
assert a.use_ipv6
2016-02-18 23:30:37 +00:00
b = tcp.Address(("foo.com", 80), True)
assert not a == b
2016-02-18 23:30:37 +00:00
c = tcp.Address(("localhost", 80), True)
assert a == c
2015-02-27 21:02:52 +00:00
assert not a != c
2016-02-18 23:30:37 +00:00
assert repr(a) == "localhost:80"
2015-02-27 21:02:52 +00:00
2015-06-22 02:52:23 +00:00
class TestSSLKeyLogger(tservers.ServerTestBase):
2015-02-27 21:02:52 +00:00
handler = EchoHandler
ssl = dict(
cipher_list="AES256-SHA"
)
2015-02-27 21:02:52 +00:00
def test_log(self):
2015-09-20 18:35:45 +00:00
testval = b"echo!\n"
2015-02-27 21:02:52 +00:00
_logfun = tcp.log_ssl_key
with tutils.tmpdir() as d:
logfile = os.path.join(d, "foo", "bar", "logfile")
tcp.log_ssl_key = tcp.SSLKeyLogger(logfile)
2015-02-27 21:02:52 +00:00
c = tcp.TCPClient(("127.0.0.1", self.port))
with c.connect():
c.convert_to_ssl()
c.wfile.write(testval)
c.wfile.flush()
assert c.rfile.readline() == testval
c.finish()
tcp.log_ssl_key.close()
with open(logfile, "rb") as f:
assert f.read().count(b"CLIENT_RANDOM") == 2
2015-02-27 21:02:52 +00:00
tcp.log_ssl_key = _logfun
def test_create_logfun(self):
2015-05-30 00:02:58 +00:00
assert isinstance(
tcp.SSLKeyLogger.create_logfun("test"),
tcp.SSLKeyLogger)
assert not tcp.SSLKeyLogger.create_logfun(False)