diff --git a/netlib/utils.py b/netlib/utils.py index 905d948fd..7e539977c 100644 --- a/netlib/utils.py +++ b/netlib/utils.py @@ -70,11 +70,12 @@ def getbit(byte, offset): class BiDi: """ A wee utility class for keeping bi-directional mappings, like field - constants in protocols: + constants in protocols. Names are attributes on the object, dict-like + access maps values to names: CONST = BiDi(a=1, b=2) assert CONST.a == 1 - assert CONST[1] == "a" + assert CONST.get_name(1) == "a" """ def __init__(self, **kwargs): self.names = kwargs @@ -89,5 +90,21 @@ class BiDi: return self.names[k] raise AttributeError("No such attribute: %s", k) - def __getitem__(self, k): - return self.values[k] + def get_name(self, n, default=None): + return self.values.get(n, default) + + +def pretty_size(size): + suffixes = [ + ("B", 2**10), + ("kB", 2**20), + ("MB", 2**30), + ] + for suf, lim in suffixes: + if size >= lim: + continue + else: + x = round(size/float(lim/2**10), 2) + if x == int(x): + x = int(x) + return str(x) + suf diff --git a/netlib/websockets.py b/netlib/websockets.py index d358ed535..1d02d6841 100644 --- a/netlib/websockets.py +++ b/netlib/websockets.py @@ -162,16 +162,21 @@ class FrameHeader: raise ValueError("Masking key must be 4 bytes.") def human_readable(self): - return "\n".join([ - ("fin - " + str(self.fin)), - ("rsv1 - " + str(self.rsv1)), - ("rsv2 - " + str(self.rsv2)), - ("rsv3 - " + str(self.rsv3)), - ("opcode - " + str(self.opcode)), - ("mask - " + str(self.mask)), - ("length_code - " + str(self.length_code)), - ("masking_key - " + repr(str(self.masking_key))), - ]) + vals = [ + "wf:", + OPCODE.get_name(self.opcode, hex(self.opcode)).lower() + ] + flags = [] + for i in ["fin", "rsv1", "rsv2", "rsv3", "mask"]: + if getattr(self, i): + flags.append(i) + if flags: + vals.extend([":", "|".join(flags)]) + if self.masking_key: + vals.append(":key=%s"%repr(self.masking_key)) + if self.payload_length: + vals.append(" %s"%utils.pretty_size(self.payload_length)) + return "".join(vals) def to_bytes(self): first_byte = utils.setbit(0, 7, self.fin) @@ -308,17 +313,8 @@ class Frame(object): return cls.from_file(io.BytesIO(bytestring)) def human_readable(self): - return "\n".join([ - ("fin - " + str(self.header.fin)), - ("rsv1 - " + str(self.header.rsv1)), - ("rsv2 - " + str(self.header.rsv2)), - ("rsv3 - " + str(self.header.rsv3)), - ("opcode - " + str(self.header.opcode)), - ("mask - " + str(self.header.mask)), - ("length_code - " + str(self.header.length_code)), - ("masking_key - " + repr(str(self.header.masking_key))), - ("payload - " + repr(str(self.payload))), - ]) + hdr = self.header.human_readable() + return hdr + "\n" + repr(self.payload) def to_bytes(self): """ diff --git a/test/test_utils.py b/test/test_utils.py index 0cdd3faed..942136fd5 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -5,9 +5,10 @@ import tutils def test_bidi(): b = utils.BiDi(a=1, b=2) assert b.a == 1 - assert b[1] == "a" + assert b.get_name(1) == "a" + assert b.get_name(5) is None tutils.raises(AttributeError, getattr, b, "c") - tutils.raises(KeyError, b.__getitem__, 5) + tutils.raises(ValueError, utils.BiDi, one=1, two=1) def test_hexdump(): @@ -19,3 +20,10 @@ def test_cleanBin(): assert utils.cleanBin("\00ne") == ".ne" assert utils.cleanBin("\nne") == "\nne" assert utils.cleanBin("\nne", True) == ".ne" + + +def test_pretty_size(): + assert utils.pretty_size(100) == "100B" + assert utils.pretty_size(1024) == "1kB" + assert utils.pretty_size(1024 + (1024/2.0)) == "1.5kB" + assert utils.pretty_size(1024*1024) == "1MB" diff --git a/test/test_websockets.py b/test/test_websockets.py index 9266d93ef..d8e56a8fd 100644 --- a/test/test_websockets.py +++ b/test/test_websockets.py @@ -185,7 +185,13 @@ class TestFrameHeader: round(masking_key="test") def test_human_readable(self): - f = websockets.FrameHeader(masking_key="test", mask=False) + f = websockets.FrameHeader( + masking_key="test", + fin=True, + payload_length=10 + ) + assert f.human_readable() + f = websockets.FrameHeader() assert f.human_readable() def test_funky(self):