Merge pull request #1615 from cortesi/python3a

exterminate six
This commit is contained in:
Aldo Cortesi 2016-10-17 17:03:02 +13:00 committed by GitHub
commit ae3ff8ee1e
60 changed files with 208 additions and 336 deletions

View File

@ -1,5 +1,5 @@
import re
from six.moves import urllib
import urllib
# set of SSL/TLS capable hosts
secure_hosts = set()

View File

@ -1,13 +1,13 @@
from six.moves import cStringIO as StringIO
import io
from PIL import Image
def response(flow):
if flow.response.headers.get("content-type", "").startswith("image"):
try:
s = StringIO(flow.response.content)
s = io.StringIO(flow.response.content)
img = Image.open(s).rotate(180)
s2 = StringIO()
s2 = io.StringIO()
img.save(s2, "png")
flow.response.content = s2.getvalue()
flow.response.headers["content-type"] = "image/png"

View File

@ -1,5 +1,5 @@
from __future__ import absolute_import, print_function, division
from six.moves import urllib
import urllib
import hashlib
from netlib import strutils

View File

@ -1,5 +1,6 @@
import collections
from six.moves import http_cookiejar
from http import cookiejar
from netlib.http import cookies
from mitmproxy import exceptions
@ -20,9 +21,9 @@ def ckey(attrs, f):
def domain_match(a, b):
if http_cookiejar.domain_match(a, b):
if cookiejar.domain_match(a, b):
return True
elif http_cookiejar.domain_match(a, b.strip(".")):
elif cookiejar.domain_match(a, b.strip(".")):
return True
return False

View File

@ -2,7 +2,6 @@ from __future__ import absolute_import, print_function, division
import abc
import copy
import six
import urwid
from mitmproxy.console import common
from mitmproxy.console import signals
@ -24,8 +23,7 @@ FOOTER_EDITING = [
]
@six.add_metaclass(abc.ABCMeta)
class Column(object):
class Column(object, metaclass=abc.ABCMeta):
subeditor = None
def __init__(self, heading):

View File

@ -13,7 +13,6 @@ import tempfile
import traceback
import weakref
import six
import urwid
from typing import Optional # noqa
@ -357,13 +356,8 @@ class ConsoleMaster(flow.FlowMaster):
def spawn_editor(self, data):
text = not isinstance(data, bytes)
fd, name = tempfile.mkstemp('', "mproxy", text=text)
if six.PY2:
os.close(fd)
with open(name, "w" if text else "wb") as f:
f.write(data)
else:
with open(fd, "w" if text else "wb") as f:
f.write(data)
with open(fd, "w" if text else "wb") as f:
f.write(data)
# if no EDITOR is set, assume 'vi'
c = os.environ.get("EDITOR") or "vi"
cmd = shlex.split(c)

View File

@ -16,24 +16,26 @@ from __future__ import absolute_import, print_function, division
import cssutils
import datetime
import html2text
import jsbeautifier
import json
import logging
import lxml.etree
import lxml.html
import subprocess
import traceback
import io
from typing import Mapping # noqa
import lxml.etree
import lxml.html
from PIL import ExifTags
from PIL import Image
import html2text
import jsbeautifier
from mitmproxy import exceptions
from mitmproxy.contrib.wbxml import ASCommandResponse
from netlib import http
from netlib import multidict
from netlib import strutils
from netlib.http import url
from six import BytesIO
from typing import Mapping # noqa
try:
import pyamf
@ -422,7 +424,7 @@ class ViewImage(View):
def __call__(self, data, **metadata):
try:
img = Image.open(BytesIO(data))
img = Image.open(io.BytesIO(data))
except IOError:
return None
parts = [

View File

@ -6,15 +6,12 @@ from __future__ import absolute_import, division, print_function
import construct
import six
class _UBInt24(construct.Adapter):
def _encode(self, obj, context):
return (
six.int2byte((obj & 0xFF0000) >> 16) +
six.int2byte((obj & 0x00FF00) >> 8) +
six.int2byte(obj & 0x0000FF)
return bytes(
(obj & 0xFF0000) >> 16,
(obj & 0x00FF00) >> 8,
obj & 0x0000FF
)
def _decode(self, obj, context):

View File

@ -41,7 +41,6 @@ all other strings are returned as plain bytes.
"""
import collections
import six
from typing import io, Union, Tuple # noqa
TSerializable = Union[None, bool, int, float, bytes, list, tuple, dict]
@ -96,7 +95,7 @@ def _rdumpq(q, size, value):
elif value is False:
write(b'5:false!')
return size + 8
elif isinstance(value, six.integer_types):
elif isinstance(value, int):
data = str(value).encode()
ldata = len(data)
span = str(ldata).encode()
@ -191,16 +190,12 @@ def load(file_handle):
def parse(data_type, data):
# type: (int, bytes) -> TSerializable
if six.PY2:
data_type = ord(data_type)
if data_type == ord(b','):
return data
if data_type == ord(b';'):
return data.decode("utf8")
if data_type == ord(b'#'):
try:
if six.PY2:
return long(data)
return int(data)
except ValueError:
raise ValueError("not a tnetstring: invalid integer literal: {}".format(data))

View File

@ -5,7 +5,7 @@
Inspired by EAS Inspector for Fiddler
https://easinspectorforfiddler.codeplex.com
----- The MIT License (MIT) -----
----- The MIT License (MIT) -----
Filename: ASWBXMLByteQueue.py
Copyright (c) 2014, David P. Shaw
@ -27,25 +27,25 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
'''
from six.moves.queue import Queue
from queue import Queue
import logging
class ASWBXMLByteQueue(Queue):
def __init__(self, wbxmlBytes):
self.bytesDequeued = 0
self.bytesEnqueued = 0
Queue.__init__(self)
for byte in wbxmlBytes:
self.put(ord(byte))
self.bytesEnqueued += 1
logging.debug("Array byte count: %d, enqueued: %d" % (self.qsize(), self.bytesEnqueued))
"""
Created to debug the dequeueing of bytes
"""
@ -54,18 +54,18 @@ class ASWBXMLByteQueue(Queue):
self.bytesDequeued += 1
logging.debug("Dequeued byte 0x{0:X} ({1} total)".format(singleByte, self.bytesDequeued))
return singleByte
"""
Return true if the continuation bit is set in the byte
"""
def checkContinuationBit(self, byteval):
continuationBitmask = 0x80
return (continuationBitmask & byteval) != 0
def dequeueMultibyteInt(self):
iReturn = 0
singleByte = 0xFF
while True:
iReturn <<= 7
if (self.qsize() == 0):
@ -100,4 +100,3 @@ class ASWBXMLByteQueue(Queue):
break
return strReturn

View File

@ -3,8 +3,7 @@ from __future__ import absolute_import, print_function, division
import functools
import threading
import contextlib
from six.moves import queue
import queue
from mitmproxy import addons
from mitmproxy import options

View File

@ -2,21 +2,15 @@ from __future__ import absolute_import, print_function, division
import json
import re
from textwrap import dedent
import six
from six.moves import urllib
import textwrap
import urllib
import netlib.http
def _native(s):
if six.PY2:
if isinstance(s, str):
return s.encode()
else:
if isinstance(s, bytes):
return s.decode()
if isinstance(s, bytes):
return s.decode()
return s
@ -49,7 +43,7 @@ def curl_command(flow):
def python_code(flow):
code = dedent("""
code = textwrap.dedent("""
import requests
url = '{url}'
@ -110,7 +104,7 @@ def is_json(headers, content):
def locust_code(flow):
code = dedent("""
code = textwrap.dedent("""
from locust import HttpLocust, TaskSet, task
class UserBehavior(TaskSet):

View File

@ -3,7 +3,6 @@ This module handles the import of mitmproxy flows generated by old versions.
"""
from __future__ import absolute_import, print_function, division
import six
from typing import Any # noqa
from netlib import version, strutils
@ -93,23 +92,21 @@ def _convert_dict_vals(o, values_to_convert):
def convert_unicode(data):
# type: (dict) -> dict
"""
The Python 2 version of mitmproxy serializes everything as bytes.
This method converts between Python 3 and Python 2 dumpfiles.
"""
if not six.PY2:
data = _convert_dict_keys(data)
data = _convert_dict_vals(
data, {
"type": True,
"id": True,
"request": {
"first_line_format": True
},
"error": {
"msg": True
}
data = _convert_dict_keys(data)
data = _convert_dict_vals(
data, {
"type": True,
"id": True,
"request": {
"first_line_format": True
},
"error": {
"msg": True
}
)
}
)
return data

View File

@ -2,15 +2,13 @@ from __future__ import absolute_import, print_function, division
from abc import abstractmethod, ABCMeta
import six
from typing import List # noqa
from mitmproxy import flowfilter
from mitmproxy import models # noqa
@six.add_metaclass(ABCMeta)
class FlowList(object):
class FlowList(metaclass=ABCMeta):
def __init__(self):
self._list = [] # type: List[models.Flow]
@ -26,9 +24,6 @@ class FlowList(object):
def __bool__(self):
return bool(self._list)
if six.PY2:
__nonzero__ = __bool__
def __len__(self):
return len(self._list)

View File

@ -4,8 +4,6 @@ import os
import signal
import sys
from six.moves import _thread # PY3: We only need _thread.error, which is an alias of RuntimeError in 3.3+
from mitmproxy import cmdline
from mitmproxy import exceptions
from mitmproxy.proxy import config
@ -78,7 +76,7 @@ def mitmproxy(args=None): # pragma: no cover
sys.exit(1)
try:
m.run()
except (KeyboardInterrupt, _thread.error):
except (KeyboardInterrupt, RuntimeError):
pass
@ -109,7 +107,7 @@ def mitmdump(args=None): # pragma: no cover
except (dump.DumpError, exceptions.OptionsError) as e:
print("mitmdump: %s" % e, file=sys.stderr)
sys.exit(1)
except (KeyboardInterrupt, _thread.error):
except (KeyboardInterrupt, RuntimeError):
pass
if master is None or master.has_errored:
print("mitmdump: errors occurred during run", file=sys.stderr)
@ -142,5 +140,5 @@ def mitmweb(args=None): # pragma: no cover
sys.exit(1)
try:
m.run()
except (KeyboardInterrupt, _thread.error):
except (KeyboardInterrupt, RuntimeError):
pass

View File

@ -4,8 +4,6 @@ import time
import copy
import os
import six
from mitmproxy import stateobject
from netlib import certutils
from netlib import tcp
@ -47,9 +45,6 @@ class ClientConnection(tcp.BaseHandler, stateobject.StateObject):
def __bool__(self):
return bool(self.connection) and not self.finished
if six.PY2:
__nonzero__ = __bool__
def __repr__(self):
return "<ClientConnection: {ssl}{address}>".format(
ssl="[ssl] " if self.ssl_established else "",
@ -136,9 +131,6 @@ class ServerConnection(tcp.TCPClient, stateobject.StateObject):
def __bool__(self):
return bool(self.connection) and not self.finished
if six.PY2:
__nonzero__ = __bool__
def __repr__(self):
if self.ssl_established and self.sni:
ssl = "[ssl: {0}] ".format(self.sni)

View File

@ -10,8 +10,8 @@ import time
import configargparse
from pydivert import enum
from pydivert import windivert
from six.moves import cPickle as pickle
from six.moves import socketserver
import pickle
import socketserver
PROXY_API_PORT = 8085

View File

@ -1,9 +1,5 @@
from __future__ import absolute_import, print_function, division
import sys
import six
import netlib.exceptions
from mitmproxy import exceptions
from mitmproxy import models
@ -184,12 +180,8 @@ class ServerConnectionMixin(object):
try:
self.server_conn.connect()
except netlib.exceptions.TcpException as e:
six.reraise(
exceptions.ProtocolException,
exceptions.ProtocolException(
"Server connection to {} failed: {}".format(
repr(self.server_conn.address), str(e)
)
),
sys.exc_info()[2]
raise exceptions.ProtocolException(
"Server connection to {} failed: {}".format(
repr(self.server_conn.address), str(e)
)
)

View File

@ -2,8 +2,6 @@ from __future__ import absolute_import, print_function, division
import h2.exceptions
import netlib.exceptions
import six
import sys
import time
import traceback
from mitmproxy import exceptions
@ -83,9 +81,6 @@ class ConnectServerConnection(object):
def __bool__(self):
return bool(self.via)
if six.PY2:
__nonzero__ = __bool__
class UpstreamConnectLayer(base.Layer):
@ -159,12 +154,8 @@ class HttpLayer(base.Layer):
# We optimistically guess there might be an HTTP client on the
# other end
self.send_error_response(400, repr(e))
six.reraise(
exceptions.ProtocolException,
exceptions.ProtocolException(
"HTTP protocol error in client request: {}".format(e)
),
sys.exc_info()[2]
raise exceptions.ProtocolException(
"HTTP protocol error in client request: {}".format(e)
)
self.log("request", "debug", [repr(request)])
@ -193,10 +184,7 @@ class HttpLayer(base.Layer):
# update host header in reverse proxy mode
if self.config.options.mode == "reverse":
if six.PY2:
flow.request.headers["Host"] = self.config.upstream_server.address.host.encode()
else:
flow.request.headers["Host"] = self.config.upstream_server.address.host
flow.request.headers["Host"] = self.config.upstream_server.address.host
# set upstream auth
if self.mode == "upstream" and self.config.upstream_auth is not None:
@ -244,8 +232,9 @@ class HttpLayer(base.Layer):
self.channel.ask("error", flow)
return
else:
six.reraise(exceptions.ProtocolException, exceptions.ProtocolException(
"Error in HTTP connection: %s" % repr(e)), sys.exc_info()[2])
raise exceptions.ProtocolException(
"Error in HTTP connection: %s" % repr(e)
)
finally:
if flow:
flow.live = False

View File

@ -6,10 +6,9 @@ import traceback
import functools
import h2.exceptions
import six
from h2 import connection
from h2 import events
from six.moves import queue
import queue
import netlib.exceptions
from mitmproxy import exceptions
@ -208,7 +207,7 @@ class Http2Layer(base.Layer):
return True
def _handle_remote_settings_changed(self, event, other_conn):
new_settings = dict([(key, cs.new_value) for (key, cs) in six.iteritems(event.changed_settings)])
new_settings = dict([(key, cs.new_value) for (key, cs) in event.changed_settings.items()])
other_conn.h2.safe_update_settings(new_settings)
return True

View File

@ -1,10 +1,8 @@
from __future__ import absolute_import, print_function, division
import struct
import sys
import construct
import six
import netlib.exceptions
from mitmproxy import exceptions
@ -214,20 +212,12 @@ def is_tls_record_magic(d):
# TLS ClientHello magic, works for SSLv3, TLSv1.0, TLSv1.1, TLSv1.2
# http://www.moserware.com/2009/06/first-few-milliseconds-of-https.html#client-hello
if six.PY2:
return (
len(d) == 3 and
d[0] == '\x16' and
d[1] == '\x03' and
d[2] in ('\x00', '\x01', '\x02', '\x03')
)
else:
return (
len(d) == 3 and
d[0] == 0x16 and
d[1] == 0x03 and
0x0 <= d[2] <= 0x03
)
return (
len(d) == 3 and
d[0] == 0x16 and
d[1] == 0x03 and
0x0 <= d[2] <= 0x03
)
def get_client_hello(client_conn):
@ -467,7 +457,7 @@ class TlsLayer(base.Layer):
self._establish_tls_with_client()
except:
pass
six.reraise(*sys.exc_info())
raise
self._establish_tls_with_client()
@ -497,15 +487,11 @@ class TlsLayer(base.Layer):
# raises ann error.
self.client_conn.rfile.peek(1)
except netlib.exceptions.TlsException as e:
six.reraise(
exceptions.ClientHandshakeException,
exceptions.ClientHandshakeException(
"Cannot establish TLS with client (sni: {sni}): {e}".format(
sni=self._client_hello.sni, e=repr(e)
),
self._client_hello.sni or repr(self.server_conn.address)
raise exceptions.ClientHandshakeException(
"Cannot establish TLS with client (sni: {sni}): {e}".format(
sni=self._client_hello.sni, e=repr(e)
),
sys.exc_info()[2]
self._client_hello.sni or repr(self.server_conn.address)
)
def _establish_tls_with_server(self):
@ -545,20 +531,14 @@ class TlsLayer(base.Layer):
self.log(str(tls_cert_err), "warn")
self.log("Ignoring server verification error, continuing with connection", "warn")
except netlib.exceptions.InvalidCertificateException as e:
six.reraise(
exceptions.InvalidServerCertificate,
exceptions.InvalidServerCertificate(str(e)),
sys.exc_info()[2]
)
raise exceptions.InvalidServerCertificate(str(e))
except netlib.exceptions.TlsException as e:
six.reraise(
exceptions.TlsProtocolException,
exceptions.TlsProtocolException("Cannot establish TLS with {address} (sni: {sni}): {e}".format(
raise exceptions.TlsProtocolException(
"Cannot establish TLS with {address} (sni: {sni}): {e}".format(
address=repr(self.server_conn.address),
sni=self.server_sni,
e=repr(e),
)),
sys.exc_info()[2]
e=repr(e)
)
)
proto = self.alpn_for_client_connection.decode() if self.alpn_for_client_connection else '-'

View File

@ -6,7 +6,6 @@ import os
import re
from netlib import strutils
import six
from OpenSSL import SSL, crypto
from mitmproxy import exceptions
@ -38,9 +37,6 @@ class HostMatcher(object):
def __bool__(self):
return bool(self.patterns)
if six.PY2:
__nonzero__ = __bool__
ServerSpec = collections.namedtuple("ServerSpec", "scheme address")

View File

@ -1,9 +1,5 @@
from __future__ import absolute_import, print_function, division
import sys
import six
import netlib.exceptions
from mitmproxy import controller
from mitmproxy import exceptions
@ -50,7 +46,7 @@ class RootContext(object):
try:
d = top_layer.client_conn.rfile.peek(3)
except netlib.exceptions.TcpException as e:
six.reraise(exceptions.ProtocolException, exceptions.ProtocolException(str(e)), sys.exc_info()[2])
raise exceptions.ProtocolException(str(e))
client_tls = protocol.is_tls_record_magic(d)
# 1. check for --ignore
@ -101,7 +97,7 @@ class RootContext(object):
is_ascii = (
len(d) == 3 and
# expect A-Za-z
all(65 <= x <= 90 or 97 <= x <= 122 for x in six.iterbytes(d))
all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
return protocol.RawTCPLayer(top_layer)

View File

@ -4,8 +4,6 @@ import socket
import sys
import traceback
import six
import netlib.exceptions
from mitmproxy import exceptions
from mitmproxy import models
@ -46,10 +44,8 @@ class ProxyServer(tcp.TCPServer):
(config.options.listen_host, config.options.listen_port)
)
except socket.error as e:
six.reraise(
exceptions.ServerException,
exceptions.ServerException('Error starting proxy server: ' + repr(e)),
sys.exc_info()[2]
raise exceptions.ServerException(
'Error starting proxy server: ' + repr(e)
)
self.channel = None

View File

@ -1,6 +1,5 @@
from __future__ import absolute_import, print_function, division
import six
from typing import Any
from typing import List
@ -34,7 +33,7 @@ class StateObject(netlib.basetypes.Serializable):
Retrieve object state.
"""
state = {}
for attr, cls in six.iteritems(self._stateobject_attributes):
for attr, cls in self._stateobject_attributes.items():
val = getattr(self, attr)
if val is None:
state[attr] = None
@ -51,7 +50,7 @@ class StateObject(netlib.basetypes.Serializable):
Load object state from data returned by a get_state call.
"""
state = state.copy()
for attr, cls in six.iteritems(self._stateobject_attributes):
for attr, cls in self._stateobject_attributes.items():
val = state.pop(attr)
if val is None:
setattr(self, attr, val)

View File

@ -8,7 +8,6 @@ import re
import hashlib
import six
import tornado.websocket
import tornado.web
from io import BytesIO
@ -242,10 +241,10 @@ class FlowHandler(RequestHandler):
def put(self, flow_id):
flow = self.flow
flow.backup()
for a, b in six.iteritems(self.json):
for a, b in self.json.items():
if a == "request":
request = flow.request
for k, v in six.iteritems(b):
for k, v in b.items():
if k in ["method", "scheme", "host", "path", "http_version"]:
setattr(request, k, str(v))
elif k == "port":
@ -259,7 +258,7 @@ class FlowHandler(RequestHandler):
elif a == "response":
response = flow.response
for k, v in six.iteritems(b):
for k, v in b.items():
if k == "msg":
response.msg = str(v)
elif k == "code":
@ -387,7 +386,7 @@ class Settings(RequestHandler):
def put(self):
update = {}
for k, v in six.iteritems(self.json):
for k, v in self.json.items():
if k == "intercept":
self.master.options.intercept = v
update[k] = v

View File

@ -3,7 +3,6 @@ from __future__ import absolute_import
import itertools
import time
from six.moves import range
import pyparsing as pp
from . import http, http2, websockets, writer, exceptions

View File

@ -1,9 +1,9 @@
import operator
import os
import abc
import functools
import pyparsing as pp
from six.moves import reduce
from netlib import strutils
from netlib import human
@ -171,14 +171,14 @@ class TokValueGenerate(Token):
def expr(cls):
e = pp.Literal("@").suppress() + v_integer
u = reduce(
u = functools.reduce(
operator.or_,
[pp.Literal(i) for i in human.SIZE_UNITS.keys()]
).leaveWhitespace()
e = e + pp.Optional(u, default=None)
s = pp.Literal(",").suppress()
s += reduce(
s += functools.reduce(
operator.or_,
[pp.Literal(i) for i in generators.DATATYPES.keys()]
)

View File

@ -2,7 +2,6 @@ import string
import random
import mmap
import six
import sys
DATATYPES = dict(
@ -52,8 +51,6 @@ def rand_byte(chars):
"""
# bytearray has consistent behaviour on both Python 2 and 3
# while bytes does not
if six.PY2:
return random.choice(chars)
return bytes([random.choice(chars)])

View File

@ -1,7 +1,5 @@
import time
import six
from netlib import strutils, human
@ -52,7 +50,7 @@ class LogCtx(object):
timestamp = self.timestamp
)
if exc_value:
six.reraise(exc_type, exc_value, traceback)
raise exc_value
def suppress(self):
self.suppressed = True

View File

@ -4,13 +4,12 @@ import sys
import os
import itertools
import hashlib
from six.moves import queue
import queue
import random
import select
import time
import OpenSSL.crypto
import six
import logging
from netlib.tutils import treq
@ -250,9 +249,9 @@ class Pathoc(tcp.TCPClient):
if resp.status_code != 200:
raise exceptions.HttpException("Unexpected status code: %s" % resp.status_code)
except exceptions.HttpException as e:
six.reraise(PathocError, PathocError(
raise PathocError(
"Proxy CONNECT failed: %s" % repr(e)
))
)
def socks_connect(self, connect_to):
try:

View File

@ -10,7 +10,7 @@ from netlib import certutils
from netlib import websockets
from netlib import version
from six.moves import urllib
import urllib
from netlib.exceptions import HttpException, HttpReadDisconnect, TcpTimeout, TcpDisconnect, \
TlsException

View File

@ -1,7 +1,6 @@
from six.moves import cStringIO as StringIO
import io
import time
from six.moves import queue
import queue
from . import pathod
from netlib import basethread
@ -12,7 +11,7 @@ class Daemon:
def __init__(self, ssl=None, **daemonargs):
self.q = queue.Queue()
self.logfp = StringIO()
self.logfp = io.StringIO()
daemonargs["logfp"] = self.logfp
self.thread = _PaThread(self.IFACE, self.q, ssl, daemonargs)
self.thread.start()

View File

@ -9,7 +9,6 @@ setup(
"twine>=1.6.5, <1.8",
"virtualenv>=14.0.5, <15.1",
"wheel>=0.29.0, <0.30",
"six>=1.10.0, <1.11",
"pysftp>=0.2.8, !=0.2.9, <0.3",
],
entry_points={

View File

@ -79,7 +79,6 @@ setup(
"pyparsing>=2.1.3, <2.2",
"pyperclip>=1.5.22, <1.6",
"requests>=2.9.1, <2.12",
"six>=1.10, <1.11",
"tornado>=4.3, <4.5",
"urwid>=1.3.1, <1.4",
"watchdog>=0.8.3, <0.9",

View File

@ -1,5 +1,6 @@
import io
from .. import tutils, mastertest
from six.moves import cStringIO as StringIO
from mitmproxy.builtins import dumper
from mitmproxy.flow import state
@ -13,7 +14,7 @@ import mock
class TestDumper(mastertest.MasterTest):
def test_simple(self):
d = dumper.Dumper()
sio = StringIO()
sio = io.StringIO()
updated = {"tfile", "flow_detail"}
d.configure(dump.Options(tfile = sio, flow_detail = 0), updated)
@ -24,17 +25,17 @@ class TestDumper(mastertest.MasterTest):
d.response(tutils.tflow())
assert sio.getvalue()
sio = StringIO()
sio = io.StringIO()
d.configure(dump.Options(tfile = sio, flow_detail = 4), updated)
d.response(tutils.tflow(resp=True))
assert "<<" in sio.getvalue()
sio = StringIO()
sio = io.StringIO()
d.configure(dump.Options(tfile = sio, flow_detail = 4), updated)
d.response(tutils.tflow(err=True))
assert "<<" in sio.getvalue()
sio = StringIO()
sio = io.StringIO()
d.configure(dump.Options(tfile = sio, flow_detail = 4), updated)
flow = tutils.tflow()
flow.request = netlib.tutils.treq()
@ -47,7 +48,7 @@ class TestDumper(mastertest.MasterTest):
d.response(flow)
assert sio.getvalue()
sio = StringIO()
sio = io.StringIO()
d.configure(dump.Options(tfile = sio, flow_detail = 4), updated)
flow = tutils.tflow(resp=netlib.tutils.tresp(content=b"{"))
flow.response.headers["content-type"] = "application/json"
@ -55,7 +56,7 @@ class TestDumper(mastertest.MasterTest):
d.response(flow)
assert sio.getvalue()
sio = StringIO()
sio = io.StringIO()
d.configure(dump.Options(tfile = sio), updated)
flow = tutils.tflow()
flow.request.content = None
@ -71,7 +72,7 @@ class TestContentView(mastertest.MasterTest):
view_auto.side_effect = exceptions.ContentViewException("")
s = state.State()
sio = StringIO()
sio = io.StringIO()
o = dump.Options(
flow_detail=4,
verbosity=3,

View File

@ -1,5 +1,5 @@
from .. import mastertest
from six.moves import cStringIO as StringIO
import io
from mitmproxy.builtins import termlog
from mitmproxy import controller
@ -9,7 +9,7 @@ from mitmproxy import dump
class TestTermLog(mastertest.MasterTest):
def test_simple(self):
t = termlog.TermLog()
sio = StringIO()
sio = io.StringIO()
t.configure(dump.Options(tfile = sio, verbosity = 2), set([]))
t.log(controller.LogEntry("one", "info"))
assert "one" in sio.getvalue()

View File

@ -4,7 +4,7 @@ from threading import Thread, Event
from mock import Mock
from mitmproxy import controller
from six.moves import queue
import queue
from mitmproxy.exceptions import Kill, ControlException
from mitmproxy.proxy import DummyServer

View File

@ -1,5 +1,5 @@
import os
from six.moves import cStringIO as StringIO
import io
from mitmproxy import dump, flow, exceptions
from . import tutils, mastertest
@ -15,7 +15,7 @@ class TestDumpMaster(mastertest.MasterTest):
options["verbosity"] = 0
if "flow_detail" not in options:
options["flow_detail"] = 0
o = dump.Options(filtstr=flt, tfile=StringIO(), **options)
o = dump.Options(filtstr=flt, tfile=io.StringIO(), **options)
return dump.DumpMaster(None, o)
def test_basic(self):
@ -38,7 +38,7 @@ class TestDumpMaster(mastertest.MasterTest):
def test_error(self):
o = dump.Options(
tfile=StringIO(),
tfile=io.StringIO(),
flow_detail=1
)
m = dump.DumpMaster(None, o)
@ -107,7 +107,7 @@ class TestDumpMaster(mastertest.MasterTest):
def test_replacements(self):
o = dump.Options(
replacements=[(".*", "content", "foo")],
tfile = StringIO(),
tfile = io.StringIO(),
)
o.verbosity = 0
o.flow_detail = 0
@ -118,7 +118,7 @@ class TestDumpMaster(mastertest.MasterTest):
def test_setheader(self):
o = dump.Options(
setheaders=[(".*", "one", "two")],
tfile=StringIO()
tfile=io.StringIO()
)
o.verbosity = 0
o.flow_detail = 0

View File

@ -1,7 +1,6 @@
import json
import os
import six
import shlex
from mitmproxy import options
from mitmproxy import contentviews
@ -122,7 +121,7 @@ class TestHARDump():
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
m, sc = tscript("har_dump.py", six.moves.shlex_quote(path))
m, sc = tscript("har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow())
m.addons.remove(sc)
@ -135,7 +134,7 @@ class TestHARDump():
with tutils.tmpdir() as tdir:
path = os.path.join(tdir, "somefile")
m, sc = tscript("har_dump.py", six.moves.shlex_quote(path))
m, sc = tscript("har_dump.py", shlex.quote(path))
m.addons.invoke(m, "response", self.flow(resp_content=b"foo" + b"\xFF" * 10))
m.addons.remove(sc)

View File

@ -1,4 +1,4 @@
from six.moves import cStringIO as StringIO
import io
from mock import patch
from mitmproxy import flowfilter
@ -9,7 +9,7 @@ from . import tutils
class TestParsing:
def _dump(self, x):
c = StringIO()
c = io.StringIO()
x.dump(fp=c)
assert c.getvalue()

View File

@ -1,4 +1,4 @@
from six.moves import socketserver
import socketserver
from time import sleep

View File

@ -7,7 +7,7 @@ import sys
from contextlib import contextmanager
from unittest.case import SkipTest
from six.moves import cStringIO as StringIO
import io
import netlib.utils
import netlib.tutils
@ -203,7 +203,7 @@ raises = netlib.tutils.raises
@contextmanager
def capture_stderr(command, *args, **kwargs):
out, sys.stderr = sys.stderr, StringIO()
out, sys.stderr = sys.stderr, io.StringIO()
command(*args, **kwargs)
yield sys.stderr.getvalue()
sys.stderr = out

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import six
from netlib.tutils import tresp
from netlib import http, tutils
@ -245,7 +243,7 @@ class TestMessageText(object):
with tutils.raises(ValueError):
assert r.text
assert r.get_text(strict=False) == u'\ufffd' if six.PY2 else '\udcff'
assert r.get_text(strict=False) == '\udcff'
def test_cannot_encode(self):
r = tresp()
@ -271,4 +269,4 @@ class TestMessageText(object):
r.headers["content-type"] = "text/html; charset=latin1"
r.text = u'\udcff'
assert r.headers["content-type"] == "text/html; charset=utf-8"
assert r.raw_content == b'\xed\xb3\xbf' if six.PY2 else b"\xFF"
assert r.raw_content == b"\xFF"

View File

@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function, division
import six
from netlib.http import Headers
from netlib.tutils import treq, raises
from .test_message import _test_decoded_attr, _test_passthrough_attr
@ -57,10 +55,6 @@ class TestRequestCore(object):
assert req.data.path is None
def test_host(self):
if six.PY2:
from unittest import SkipTest
raise SkipTest()
request = treq()
assert request.host == request.data.host.decode("idna")

View File

@ -1,4 +1,3 @@
import six
from netlib import tutils
from netlib.http import url
@ -58,10 +57,7 @@ def test_unparse():
assert url.unparse("https", "foo.com", 443, "") == "https://foo.com"
if six.PY2:
surrogates = bytes(bytearray(range(256)))
else:
surrogates = bytes(range(256)).decode("utf8", "surrogateescape")
surrogates = bytes(range(256)).decode("utf8", "surrogateescape")
surrogates_quoted = (
'%00%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F'

View File

@ -1,17 +1,17 @@
from __future__ import (absolute_import, print_function, division)
from six.moves import cStringIO as StringIO
import io
from netlib import debug
def test_dump_info():
cs = StringIO()
cs = io.StringIO()
debug.dump_info(None, None, file=cs, testing=True)
assert cs.getvalue()
def test_dump_stacks():
cs = StringIO()
cs = io.StringIO()
debug.dump_stacks(None, None, file=cs, testing=True)
assert cs.getvalue()

View File

@ -1,5 +1,3 @@
import six
from netlib import strutils, tutils
@ -15,12 +13,8 @@ def test_always_bytes():
def test_native():
with tutils.raises(TypeError):
strutils.native(42)
if six.PY2:
assert strutils.native(u"foo") == b"foo"
assert strutils.native(b"foo") == b"foo"
else:
assert strutils.native(u"foo") == u"foo"
assert strutils.native(b"foo") == u"foo"
assert strutils.native(u"foo") == u"foo"
assert strutils.native(b"foo") == u"foo"
def test_escape_control_characters():
@ -40,9 +34,8 @@ def test_escape_control_characters():
u'=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz{|}~.'
)
if not six.PY2:
with tutils.raises(ValueError):
strutils.escape_control_characters(b"foo")
with tutils.raises(ValueError):
strutils.escape_control_characters(b"foo")
def test_bytes_to_escaped_str():
@ -76,12 +69,8 @@ def test_escaped_str_to_bytes():
assert strutils.escaped_str_to_bytes(u"&!?=\\\\)") == br"&!?=\)"
assert strutils.escaped_str_to_bytes(u"\u00fc") == b'\xc3\xbc'
if six.PY2:
with tutils.raises(ValueError):
strutils.escaped_str_to_bytes(42)
else:
with tutils.raises(ValueError):
strutils.escaped_str_to_bytes(b"very byte")
with tutils.raises(ValueError):
strutils.escaped_str_to_bytes(b"very byte")
def test_is_mostly_bin():

View File

@ -1,5 +1,5 @@
from io import BytesIO
from six.moves import queue
import queue
import time
import socket
import random

View File

@ -1,11 +1,11 @@
from io import StringIO
import io
import mock
from netlib import version_check
@mock.patch("sys.exit")
def test_check_pyopenssl_version(sexit):
fp = StringIO()
fp = io.StringIO()
version_check.check_pyopenssl_version(fp=fp)
assert not fp.getvalue()
assert not sexit.called
@ -19,7 +19,7 @@ def test_check_pyopenssl_version(sexit):
@mock.patch("OpenSSL.__version__")
def test_unparseable_pyopenssl_version(version, sexit):
version.split.return_value = ["foo", "bar"]
fp = StringIO()
fp = io.StringIO()
version_check.check_pyopenssl_version(fp=fp)
assert "Cannot parse" in fp.getvalue()
assert not sexit.called

View File

@ -1,8 +1,8 @@
from __future__ import (absolute_import, print_function, division)
import threading
from six.moves import queue
from io import StringIO
import queue
import io
import OpenSSL
from netlib import tcp
@ -80,7 +80,7 @@ class _TServer(tcp.TCPServer):
h.finish()
def handle_error(self, connection, client_address, fp=None):
s = StringIO()
s = io.StringIO()
tcp.TCPServer.handle_error(self, connection, client_address, s)
self.q.put(s.getvalue())

View File

@ -1,4 +1,4 @@
from six import BytesIO
import io
from pathod.language import actions, parse_pathoc, parse_pathod, serve
@ -60,7 +60,7 @@ class TestInject:
assert v.offset == "r"
def test_serve(self):
s = BytesIO()
s = io.BytesIO()
r = next(parse_pathod("400:i0,'foo'"))
assert serve(r, s, {})

View File

@ -1,4 +1,4 @@
from six import BytesIO
import io
from pathod import language
from pathod.language import http, base
@ -10,7 +10,7 @@ def parse_request(s):
def test_make_error_response():
d = BytesIO()
d = io.BytesIO()
s = http.make_error_response("foo")
language.serve(s, d, {})
@ -76,7 +76,7 @@ class TestRequest:
assert r[0].values({})
def test_render(self):
s = BytesIO()
s = io.BytesIO()
r = parse_request("GET:'/foo'")
assert language.serve(
r,
@ -163,7 +163,7 @@ class TestResponse:
assert b"OK" in [i[:] for i in r.preamble({})]
def test_render(self):
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:m'msg'"))
assert language.serve(r, s, {})
@ -173,13 +173,13 @@ class TestResponse:
assert "p0" not in s.spec()
def test_raw(self):
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:b'foo'"))
language.serve(r, s, {})
v = s.getvalue()
assert b"Content-Length" in v
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:b'foo':r"))
language.serve(r, s, {})
v = s.getvalue()
@ -187,7 +187,7 @@ class TestResponse:
def test_length(self):
def testlen(x):
s = BytesIO()
s = io.BytesIO()
x = next(x)
language.serve(x, s, language.Settings())
assert x.length(language.Settings()) == len(s.getvalue())
@ -198,7 +198,7 @@ class TestResponse:
def test_maximum_length(self):
def testlen(x):
x = next(x)
s = BytesIO()
s = io.BytesIO()
m = x.maximum_length({})
language.serve(x, s, {})
assert m >= len(s.getvalue())

View File

@ -1,4 +1,4 @@
from six import BytesIO
import io
from netlib import tcp
from netlib.http import user_agents
@ -26,7 +26,7 @@ def default_settings():
def test_make_error_response():
d = BytesIO()
d = io.BytesIO()
s = http2.make_error_response("foo", "bar")
language.serve(s, d, default_settings())
@ -85,7 +85,7 @@ class TestRequest:
assert r[1].method.string() == b"GET"
def test_render_simple(self):
s = BytesIO()
s = io.BytesIO()
r = parse_request("GET:'/foo'")
assert language.serve(
r,
@ -127,7 +127,7 @@ class TestRequest:
assert r.headers[0].values(default_settings()) == (b"user-agent", user_agents.get_by_shortcut('a')[2].encode())
def test_render_with_headers(self):
s = BytesIO()
s = io.BytesIO()
r = parse_request('GET:/foo:h"foo"="bar"')
assert language.serve(
r,
@ -143,7 +143,7 @@ class TestRequest:
assert r.values(default_settings())
def test_render_with_body(self):
s = BytesIO()
s = io.BytesIO()
r = parse_request("GET:'/foo':bfoobar")
assert language.serve(
r,
@ -200,7 +200,7 @@ class TestResponse:
assert r.body.string() == b"foobar"
def test_render_simple(self):
s = BytesIO()
s = io.BytesIO()
r = parse_response('200')
assert language.serve(
r,
@ -209,7 +209,7 @@ class TestResponse:
)
def test_render_with_headers(self):
s = BytesIO()
s = io.BytesIO()
r = parse_response('200:h"foo"="bar"')
assert language.serve(
r,
@ -218,7 +218,7 @@ class TestResponse:
)
def test_render_with_body(self):
s = BytesIO()
s = io.BytesIO()
r = parse_response('200:bfoobar')
assert language.serve(
r,

View File

@ -1,4 +1,4 @@
from six import BytesIO
import io
from pathod import language
from pathod.language import writer
@ -6,12 +6,12 @@ from pathod.language import writer
def test_send_chunk():
v = b"foobarfoobar"
for bs in range(1, len(v) + 2):
s = BytesIO()
s = io.BytesIO()
writer.send_chunk(s, v, bs, 0, len(v))
assert s.getvalue() == v
for start in range(len(v)):
for end in range(len(v)):
s = BytesIO()
s = io.BytesIO()
writer.send_chunk(s, v, bs, start, end)
assert s.getvalue() == v[start:end]
@ -19,21 +19,21 @@ def test_send_chunk():
def test_write_values_inject():
tst = b"foo"
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, [tst], [(0, "inject", b"aaa")], blocksize=5)
assert s.getvalue() == b"aaafoo"
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5)
assert s.getvalue() == b"faaaoo"
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, [tst], [(1, "inject", b"aaa")], blocksize=5)
assert s.getvalue() == b"faaaoo"
def test_write_values_disconnects():
s = BytesIO()
s = io.BytesIO()
tst = b"foo" * 100
writer.write_values(s, [tst], [(0, "disconnect")], blocksize=5)
assert not s.getvalue()
@ -41,13 +41,13 @@ def test_write_values_disconnects():
def test_write_values():
tst = b"foobarvoing"
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, [tst], [])
assert s.getvalue() == tst
for bs in range(1, len(tst) + 2):
for off in range(len(tst)):
s = BytesIO()
s = io.BytesIO()
writer.write_values(
s, [tst], [(off, "disconnect")], blocksize=bs
)
@ -57,34 +57,34 @@ def test_write_values():
def test_write_values_pauses():
tst = "".join(str(i) for i in range(10)).encode()
for i in range(2, 10):
s = BytesIO()
s = io.BytesIO()
writer.write_values(
s, [tst], [(2, "pause", 0), (1, "pause", 0)], blocksize=i
)
assert s.getvalue() == tst
for i in range(2, 10):
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, [tst], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == tst
tst = [tst] * 5
for i in range(2, 10):
s = BytesIO()
s = io.BytesIO()
writer.write_values(s, tst[:], [(1, "pause", 0)], blocksize=i)
assert s.getvalue() == b"".join(tst)
def test_write_values_after():
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:da"))
language.serve(r, s, {})
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:pa,0"))
language.serve(r, s, {})
s = BytesIO()
s = io.BytesIO()
r = next(language.parse_pathod("400:ia,'xx'"))
language.serve(r, s, {})
assert s.getvalue().endswith(b'xx')

View File

@ -1,10 +1,10 @@
import io
from pathod import log
from netlib.exceptions import TcpDisconnect
import six
class DummyIO(six.StringIO):
class DummyIO(io.StringIO):
def start_log(self, *args, **kwargs):
pass

View File

@ -1,5 +1,4 @@
from six.moves import cStringIO as StringIO
from six import BytesIO
import io
from mock import Mock
from netlib import http
@ -21,7 +20,7 @@ def test_response():
class PathocTestDaemon(tutils.DaemonTests):
def tval(self, requests, timeout=None, showssl=False, **kwargs):
s = StringIO()
s = io.StringIO()
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
ssl=self.ssl,
@ -71,7 +70,7 @@ class TestDaemonSSL(PathocTestDaemon):
assert log[0]["request"]["clientcert"]["keyinfo"]
def test_http2_without_ssl(self):
fp = StringIO()
fp = io.StringIO()
c = pathoc.Pathoc(
("127.0.0.1", self.d.port),
use_http2=True,
@ -171,15 +170,15 @@ class TestDaemon(PathocTestDaemon):
def test_connect_fail(self):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.rfile, c.wfile = BytesIO(), BytesIO()
c.rfile, c.wfile = io.BytesIO(), io.BytesIO()
with raises("connect failed"):
c.http_connect(to)
c.rfile = BytesIO(
c.rfile = io.BytesIO(
b"HTTP/1.1 500 OK\r\n"
)
with raises("connect failed"):
c.http_connect(to)
c.rfile = BytesIO(
c.rfile = io.BytesIO(
b"HTTP/1.1 200 OK\r\n"
)
c.http_connect(to)
@ -187,7 +186,7 @@ class TestDaemon(PathocTestDaemon):
def test_socks_connect(self):
to = ("foobar", 80)
c = pathoc.Pathoc(("127.0.0.1", self.d.port), fp=None)
c.rfile, c.wfile = tutils.treader(b""), BytesIO()
c.rfile, c.wfile = tutils.treader(b""), io.BytesIO()
tutils.raises(pathoc.PathocError, c.socks_connect, to)
c.rfile = tutils.treader(

View File

@ -1,4 +1,4 @@
from six.moves import cStringIO as StringIO
import io
import mock
from pathod import pathoc_cmdline as cmdline
@ -9,7 +9,7 @@ from . import tutils
@mock.patch("argparse.ArgumentParser.error")
def test_pathoc(perror):
assert cmdline.args_pathoc(["pathoc", "foo.com", "get:/"])
s = StringIO()
s = io.StringIO()
with tutils.raises(SystemExit):
cmdline.args_pathoc(["pathoc", "--show-uas"], s, s)

View File

@ -1,4 +1,4 @@
from six.moves import cStringIO as StringIO
import io
from pathod import pathod
from netlib import tcp
@ -10,7 +10,7 @@ from . import tutils
class TestPathod(object):
def test_logging(self):
s = StringIO()
s = io.StringIO()
p = pathod.Pathod(("127.0.0.1", 0), logfp=s)
assert len(p.get_log()) == 0
id = p.add_log(dict(s="foo"))

View File

@ -2,9 +2,8 @@ import tempfile
import re
import shutil
import requests
from six.moves import cStringIO as StringIO
from six.moves import urllib
from six import BytesIO
import io
import urllib
from netlib import tcp
from netlib import utils
@ -20,7 +19,7 @@ def treader(bytes):
"""
Construct a tcp.Read object from bytes.
"""
fp = BytesIO(bytes)
fp = io.BytesIO(bytes)
return tcp.Reader(fp)
@ -79,7 +78,7 @@ class DaemonTests(object):
return resp
def getpath(self, path, params=None):
logfp = StringIO()
logfp = io.StringIO()
c = pathoc.Pathoc(
("localhost", self.d.port),
ssl=self.ssl,
@ -92,7 +91,7 @@ class DaemonTests(object):
return resp
def get(self, spec):
logfp = StringIO()
logfp = io.StringIO()
c = pathoc.Pathoc(
("localhost", self.d.port),
ssl=self.ssl,
@ -118,7 +117,7 @@ class DaemonTests(object):
"""
if ssl is None:
ssl = self.ssl
logfp = StringIO()
logfp = io.StringIO()
c = pathoc.Pathoc(
("localhost", self.d.port),
ssl=ssl,
@ -148,6 +147,6 @@ test_data = utils.Data(__name__)
def render(r, settings=language.Settings()):
r = r.resolve(settings)
s = BytesIO()
s = io.BytesIO()
assert language.serve(r, s, settings)
return s.getvalue()