2015-09-26 18:07:11 +00:00
|
|
|
# -*- coding: utf-8 -*-
|
|
|
|
from __future__ import absolute_import, print_function, division
|
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
import six
|
|
|
|
|
2016-05-28 12:36:43 +00:00
|
|
|
from netlib.tutils import tresp
|
2016-07-04 20:58:09 +00:00
|
|
|
from netlib import http, tutils
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
def _test_passthrough_attr(message, attr):
|
2015-09-26 22:49:41 +00:00
|
|
|
assert getattr(message, attr) == getattr(message.data, attr)
|
2016-07-08 00:50:26 +00:00
|
|
|
setattr(message, attr, b"foo")
|
|
|
|
assert getattr(message.data, attr) == b"foo"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
def _test_decoded_attr(message, attr):
|
2015-09-26 22:49:41 +00:00
|
|
|
assert getattr(message, attr) == getattr(message.data, attr).decode("utf8")
|
|
|
|
# Set str, get raw bytes
|
|
|
|
setattr(message, attr, "foo")
|
|
|
|
assert getattr(message.data, attr) == b"foo"
|
|
|
|
# Set raw bytes, get decoded
|
2015-09-28 12:04:25 +00:00
|
|
|
setattr(message.data, attr, b"BAR") # use uppercase so that we can also cover request.method
|
|
|
|
assert getattr(message, attr) == "BAR"
|
2015-09-26 22:49:41 +00:00
|
|
|
# Set bytes, get raw bytes
|
|
|
|
setattr(message, attr, b"baz")
|
|
|
|
assert getattr(message.data, attr) == b"baz"
|
|
|
|
|
|
|
|
# Set UTF8
|
|
|
|
setattr(message, attr, "Non-Autorisé")
|
|
|
|
assert getattr(message.data, attr) == b"Non-Autoris\xc3\xa9"
|
|
|
|
# Don't fail on garbage
|
2016-02-16 01:02:32 +00:00
|
|
|
setattr(message.data, attr, b"FOO\xBF\x00BAR")
|
2015-09-28 12:04:25 +00:00
|
|
|
assert getattr(message, attr).startswith("FOO")
|
|
|
|
assert getattr(message, attr).endswith("BAR")
|
2015-09-26 22:49:41 +00:00
|
|
|
# foo.bar = foo.bar should not cause any side effects.
|
|
|
|
d = getattr(message, attr)
|
|
|
|
setattr(message, attr, d)
|
2016-02-16 01:02:32 +00:00
|
|
|
assert getattr(message.data, attr) == b"FOO\xBF\x00BAR"
|
2015-09-26 22:49:41 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestMessageData(object):
|
|
|
|
def test_eq_ne(self):
|
|
|
|
data = tresp(timestamp_start=42, timestamp_end=42).data
|
|
|
|
same = tresp(timestamp_start=42, timestamp_end=42).data
|
|
|
|
assert data == same
|
|
|
|
assert not data != same
|
|
|
|
|
|
|
|
other = tresp(content=b"foo").data
|
|
|
|
assert not data == other
|
|
|
|
assert data != other
|
|
|
|
|
|
|
|
assert data != 0
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
|
|
|
|
class TestMessage(object):
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
resp = tresp()
|
|
|
|
assert resp.data
|
|
|
|
|
|
|
|
def test_eq_ne(self):
|
|
|
|
resp = tresp(timestamp_start=42, timestamp_end=42)
|
|
|
|
same = tresp(timestamp_start=42, timestamp_end=42)
|
|
|
|
assert resp == same
|
|
|
|
assert not resp != same
|
|
|
|
|
|
|
|
other = tresp(timestamp_start=0, timestamp_end=0)
|
|
|
|
assert not resp == other
|
|
|
|
assert resp != other
|
|
|
|
|
|
|
|
assert resp != 0
|
|
|
|
|
2016-07-02 10:03:42 +00:00
|
|
|
def test_serializable(self):
|
|
|
|
resp = tresp()
|
|
|
|
resp2 = http.Response.from_state(resp.get_state())
|
|
|
|
assert resp == resp2
|
|
|
|
|
2015-09-26 18:07:11 +00:00
|
|
|
def test_content_length_update(self):
|
|
|
|
resp = tresp()
|
|
|
|
resp.content = b"foo"
|
|
|
|
assert resp.data.content == b"foo"
|
|
|
|
assert resp.headers["content-length"] == "3"
|
|
|
|
resp.content = b""
|
|
|
|
assert resp.data.content == b""
|
|
|
|
assert resp.headers["content-length"] == "0"
|
2016-07-02 08:51:47 +00:00
|
|
|
resp.raw_content = b"bar"
|
|
|
|
assert resp.data.content == b"bar"
|
|
|
|
assert resp.headers["content-length"] == "0"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2015-09-26 22:49:41 +00:00
|
|
|
def test_headers(self):
|
|
|
|
_test_passthrough_attr(tresp(), "headers")
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2015-09-26 22:49:41 +00:00
|
|
|
def test_timestamp_start(self):
|
|
|
|
_test_passthrough_attr(tresp(), "timestamp_start")
|
|
|
|
|
|
|
|
def test_timestamp_end(self):
|
|
|
|
_test_passthrough_attr(tresp(), "timestamp_end")
|
|
|
|
|
2016-07-02 10:03:42 +00:00
|
|
|
def test_http_version(self):
|
2015-09-26 22:49:41 +00:00
|
|
|
_test_decoded_attr(tresp(), "http_version")
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-08-31 04:53:33 +00:00
|
|
|
def test_replace(self):
|
|
|
|
r = tresp()
|
|
|
|
r.content = b"foofootoo"
|
2016-08-31 10:57:22 +00:00
|
|
|
r.replace(b"foo", "gg")
|
2016-08-31 11:09:04 +00:00
|
|
|
assert r.content == b"ggggtoo"
|
2016-08-31 04:53:33 +00:00
|
|
|
|
|
|
|
r.content = b"foofootoo"
|
2016-08-31 10:57:22 +00:00
|
|
|
r.replace(b"foo", "gg", count=1)
|
2016-08-31 04:53:33 +00:00
|
|
|
assert r.content == b"ggfootoo"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-08-31 10:57:22 +00:00
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
class TestMessageContentEncoding(object):
|
2015-09-26 18:07:11 +00:00
|
|
|
def test_simple(self):
|
|
|
|
r = tresp()
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content == b"message"
|
2015-09-26 18:07:11 +00:00
|
|
|
assert "content-encoding" not in r.headers
|
2016-07-02 08:51:47 +00:00
|
|
|
r.encode("gzip")
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
assert r.headers["content-encoding"]
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content != b"message"
|
|
|
|
assert r.content == b"message"
|
|
|
|
assert r.raw_content != b"message"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
def test_modify(self):
|
|
|
|
r = tresp()
|
|
|
|
assert "content-encoding" not in r.headers
|
2016-07-02 08:51:47 +00:00
|
|
|
r.encode("gzip")
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
r.content = b"foo"
|
|
|
|
assert r.raw_content != b"foo"
|
2015-09-26 18:07:11 +00:00
|
|
|
r.decode()
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content == b"foo"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
with tutils.raises(TypeError):
|
|
|
|
r.content = u"foo"
|
|
|
|
|
2015-09-26 18:07:11 +00:00
|
|
|
def test_unknown_ce(self):
|
|
|
|
r = tresp()
|
|
|
|
r.headers["content-encoding"] = "zopfli"
|
2016-07-02 08:51:47 +00:00
|
|
|
r.raw_content = b"foo"
|
2016-07-04 20:58:09 +00:00
|
|
|
with tutils.raises(ValueError):
|
|
|
|
assert r.content
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.headers["content-encoding"]
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.get_content(strict=False) == b"foo"
|
2015-09-26 18:07:11 +00:00
|
|
|
|
|
|
|
def test_cannot_decode(self):
|
|
|
|
r = tresp()
|
2016-07-02 08:51:47 +00:00
|
|
|
r.encode("gzip")
|
|
|
|
r.raw_content = b"foo"
|
2016-07-04 20:58:09 +00:00
|
|
|
with tutils.raises(ValueError):
|
|
|
|
assert r.content
|
2015-09-26 18:07:11 +00:00
|
|
|
assert r.headers["content-encoding"]
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.get_content(strict=False) == b"foo"
|
2016-07-04 20:58:09 +00:00
|
|
|
|
|
|
|
with tutils.raises(ValueError):
|
|
|
|
r.decode()
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content == b"foo"
|
2016-07-04 20:58:09 +00:00
|
|
|
assert "content-encoding" in r.headers
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
r.decode(strict=False)
|
|
|
|
assert r.content == b"foo"
|
|
|
|
assert "content-encoding" not in r.headers
|
|
|
|
|
|
|
|
def test_none(self):
|
|
|
|
r = tresp(content=None)
|
|
|
|
assert r.content is None
|
|
|
|
r.content = b"foo"
|
|
|
|
assert r.content is not None
|
|
|
|
r.content = None
|
|
|
|
assert r.content is None
|
|
|
|
|
2015-09-26 18:07:11 +00:00
|
|
|
def test_cannot_encode(self):
|
|
|
|
r = tresp()
|
2016-07-02 08:51:47 +00:00
|
|
|
r.encode("gzip")
|
|
|
|
r.content = None
|
|
|
|
assert r.headers["content-encoding"]
|
|
|
|
assert r.raw_content is None
|
2015-09-26 18:07:11 +00:00
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
r.headers["content-encoding"] = "zopfli"
|
|
|
|
r.content = b"foo"
|
2015-09-26 18:07:11 +00:00
|
|
|
assert "content-encoding" not in r.headers
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content == b"foo"
|
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
with tutils.raises(ValueError):
|
|
|
|
r.encode("zopfli")
|
|
|
|
assert r.raw_content == b"foo"
|
|
|
|
assert "content-encoding" not in r.headers
|
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
|
|
|
|
class TestMessageText(object):
|
|
|
|
def test_simple(self):
|
2016-07-16 05:50:33 +00:00
|
|
|
r = tresp(content=b'\xfc')
|
|
|
|
assert r.raw_content == b"\xfc"
|
|
|
|
assert r.content == b"\xfc"
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.text == u"ü"
|
|
|
|
|
|
|
|
r.encode("gzip")
|
|
|
|
assert r.text == u"ü"
|
|
|
|
r.decode()
|
|
|
|
assert r.text == u"ü"
|
|
|
|
|
|
|
|
r.headers["content-type"] = "text/html; charset=latin1"
|
2016-07-16 05:50:33 +00:00
|
|
|
r.content = b"\xc3\xbc"
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.text == u"ü"
|
2016-07-16 05:50:33 +00:00
|
|
|
r.headers["content-type"] = "text/html; charset=utf8"
|
|
|
|
assert r.text == u"ü"
|
2016-07-02 08:51:47 +00:00
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
def test_guess_json(self):
|
|
|
|
r = tresp(content=b'"\xc3\xbc"')
|
|
|
|
r.headers["content-type"] = "application/json"
|
|
|
|
assert r.text == u'"ü"'
|
|
|
|
|
|
|
|
def test_none(self):
|
|
|
|
r = tresp(content=None)
|
|
|
|
assert r.text is None
|
2016-07-16 06:17:57 +00:00
|
|
|
r.text = u"foo"
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.text is not None
|
|
|
|
r.text = None
|
|
|
|
assert r.text is None
|
|
|
|
|
2016-07-02 08:51:47 +00:00
|
|
|
def test_modify(self):
|
|
|
|
r = tresp()
|
|
|
|
|
|
|
|
r.text = u"ü"
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.raw_content == b"\xfc"
|
2016-07-02 08:51:47 +00:00
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
r.headers["content-type"] = "text/html; charset=utf8"
|
2016-07-02 08:51:47 +00:00
|
|
|
r.text = u"ü"
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.raw_content == b"\xc3\xbc"
|
|
|
|
assert r.headers["content-length"] == "2"
|
2016-07-02 08:51:47 +00:00
|
|
|
|
|
|
|
def test_unknown_ce(self):
|
|
|
|
r = tresp()
|
|
|
|
r.headers["content-type"] = "text/html; charset=wtf"
|
|
|
|
r.raw_content = b"foo"
|
2016-07-16 05:50:33 +00:00
|
|
|
with tutils.raises(ValueError):
|
|
|
|
assert r.text == u"foo"
|
|
|
|
assert r.get_text(strict=False) == u"foo"
|
2016-07-02 08:51:47 +00:00
|
|
|
|
|
|
|
def test_cannot_decode(self):
|
|
|
|
r = tresp()
|
2016-07-16 05:50:33 +00:00
|
|
|
r.headers["content-type"] = "text/html; charset=utf8"
|
2016-07-02 08:51:47 +00:00
|
|
|
r.raw_content = b"\xFF"
|
2016-07-16 05:50:33 +00:00
|
|
|
with tutils.raises(ValueError):
|
|
|
|
assert r.text
|
|
|
|
|
|
|
|
assert r.get_text(strict=False) == u'\ufffd' if six.PY2 else '\udcff'
|
2016-07-02 08:51:47 +00:00
|
|
|
|
|
|
|
def test_cannot_encode(self):
|
|
|
|
r = tresp()
|
|
|
|
r.content = None
|
|
|
|
assert "content-type" not in r.headers
|
|
|
|
assert r.raw_content is None
|
|
|
|
|
2016-07-16 05:50:33 +00:00
|
|
|
r.headers["content-type"] = "text/html; charset=latin1; foo=bar"
|
2016-07-02 08:51:47 +00:00
|
|
|
r.text = u"☃"
|
2016-07-16 05:50:33 +00:00
|
|
|
assert r.headers["content-type"] == "text/html; charset=utf-8; foo=bar"
|
|
|
|
assert r.raw_content == b'\xe2\x98\x83'
|
|
|
|
|
|
|
|
r.headers["content-type"] = "gibberish"
|
|
|
|
r.text = u"☃"
|
|
|
|
assert r.headers["content-type"] == "text/plain; charset=utf-8"
|
|
|
|
assert r.raw_content == b'\xe2\x98\x83'
|
|
|
|
|
|
|
|
del r.headers["content-type"]
|
|
|
|
r.text = u"☃"
|
|
|
|
assert r.headers["content-type"] == "text/plain; charset=utf-8"
|
2016-07-02 08:51:47 +00:00
|
|
|
assert r.raw_content == b'\xe2\x98\x83'
|
|
|
|
|
|
|
|
r.headers["content-type"] = "text/html; charset=latin1"
|
|
|
|
r.text = u'\udcff'
|
|
|
|
assert r.headers["content-type"] == "text/html; charset=utf-8"
|
|
|
|
assert r.raw_content == b'\xed\xb3\xbf' if six.PY2 else b"\xFF"
|