gRPC packed repeated encoding (#4872)

* Replaced Kaitai protobuf parser with custom implementation, to prepare packed data handling

* Clamp varint size for bool conversion to 64bit, to satify tests

* moved WireParser into ProtoParser

* preserve work state

* Full packed support

* noc changes to example addon

* Adjusted test regex for ValueError

* Do not try to unpack fields twice
This commit is contained in:
mame82 2021-10-21 13:20:13 +02:00 committed by GitHub
parent eea48c6c27
commit 3a884ceb8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 684 additions and 177 deletions

View File

@ -7,8 +7,6 @@ from typing import Dict, Generator, Iterable, Iterator, List, Optional, Tuple, U
from mitmproxy import contentviews, ctx, flow, flowfilter, http from mitmproxy import contentviews, ctx, flow, flowfilter, http
from mitmproxy.contentviews import base from mitmproxy.contentviews import base
from mitmproxy.contrib.kaitaistruct.google_protobuf import GoogleProtobuf
from mitmproxy.contrib.kaitaistruct.vlq_base128_le import VlqBase128Le
from mitmproxy.net.encoding import decode from mitmproxy.net.encoding import decode
@ -108,6 +106,9 @@ class ProtoParser:
name: Optional[str] = None name: Optional[str] = None
"""optional: intended field for visualization (parser fails over to alternate decoding if not possible)""" """optional: intended field for visualization (parser fails over to alternate decoding if not possible)"""
as_packed: Optional[bool] = False
"""optional: if set to true, the field is considered to be repeated and packed"""
@dataclass @dataclass
class ParserOptions: class ParserOptions:
# output should contain wiretype of fields # output should contain wiretype of fields
@ -143,115 +144,252 @@ class ProtoParser:
string = 14 string = 14
bytes = 15 bytes = 15
message = 16 message = 16
packed_repeated_field = 17
# helper # helper
unknown = 18 unknown = 17
class Message: @staticmethod
def __init__( def _read_base128le(data: bytes) -> Tuple[int, int]:
self, res = 0
data: bytes, offset = 0
options: ProtoParser.ParserOptions, while offset < len(data):
rules: List[ProtoParser.ParserRule], o = data[offset]
parent_field: ProtoParser.Field = None, res += ((o & 0x7f) << (7 * offset))
) -> None: offset += 1
self.data: bytes = data if o < 0x80:
self.parent_field: Optional[ProtoParser.Field] = parent_field # the Kaitai parser for protobuf support base128 le values up
self.options: ProtoParser.ParserOptions = options # to 8 groups (bytes). Due to the nature of the encoding, each
self.rules: List[ProtoParser.ParserRule] = rules # group attributes 7bit to the resulting value, which give
try: # a 56 bit value at maximum.
self.fields: List[ProtoParser.Field] = self.parse_message_fields(data) # The values which get encoded into protobuf variable length integers,
except: # on the other hand, include full 64bit types (int64, uint64, sint64).
raise ValueError("not a valid protobuf message") # This means, the Kaitai encoder can not cover the full range of
# possible values
#
# This decoder puts no limitation on the maximum value of variable
# length integers. Values exceeding 64bit have to be handled externally
return offset, res
raise ValueError("varint exceeds bounds of provided data")
def parse_message_fields(self, message: bytes) -> List: @staticmethod
res: List[ProtoParser.Field] = [] def _read_u32(data: bytes) -> Tuple[int, int]:
return 4, struct.unpack("<I", data[:4])[0]
pb: GoogleProtobuf = GoogleProtobuf.from_bytes(message) @staticmethod
for pair in pb.pairs: def _read_u64(data: bytes) -> Tuple[int, int]:
tag = pair.field_tag return 8, struct.unpack("<Q", data[:8])[0]
wt = pair.wire_type
if wt == GoogleProtobuf.Pair.WireTypes.group_start or wt == GoogleProtobuf.Pair.WireTypes.group_end:
# raise error on deprecated types without values
raise ValueError("deprecated field: {}".format(wt))
v: Union[GoogleProtobuf.DelimitedBytes, VlqBase128Le] = pair.value # for WireType bit-32 and bit-64
preferred_decoding = ProtoParser.DecodedTypes.unknown
# see: https://www.oreilly.com/library/view/grpc-up-and/9781492058328/ch04.html
if wt == GoogleProtobuf.Pair.WireTypes.len_delimited:
assert isinstance(v, GoogleProtobuf.DelimitedBytes)
v = v.body
assert isinstance(v, bytes)
# always try to parse length delimited data as nested protobuf message
preferred_decoding = ProtoParser.DecodedTypes.message
if wt == GoogleProtobuf.Pair.WireTypes.varint:
assert isinstance(v, VlqBase128Le)
v = v.value
assert isinstance(v, int)
if v.bit_length() > 32:
preferred_decoding = ProtoParser.DecodedTypes.uint64
else:
preferred_decoding = ProtoParser.DecodedTypes.uint32
if wt == GoogleProtobuf.Pair.WireTypes.bit_64:
# exists in Protobuf for efficient encoding, when decoded comes down to uint64
assert isinstance(v, int)
preferred_decoding = ProtoParser.DecodedTypes.fixed64
if wt == GoogleProtobuf.Pair.WireTypes.bit_32:
# exists in Protobuf for efficient encoding, when decoded comes down to uint32
assert isinstance(v, int)
preferred_decoding = ProtoParser.DecodedTypes.fixed32
field = ProtoParser.Field( class WireTypes(Enum):
preferred_decoding=preferred_decoding, varint = 0
wire_type=wt, bit_64 = 1
tag=tag, len_delimited = 2
wire_value=v, group_start = 3
owning_message=self, group_end = 4
options=self.options, bit_32 = 5
rules=self.rules
)
res.append(field)
return res
def gen_fields(self) -> Generator[ProtoParser.Field, None, None]: @staticmethod
for f in self.fields: def read_fields(
yield f wire_data: bytes,
parent_field: Optional[ProtoParser.Field],
options: ProtoParser.ParserOptions,
rules: List[ProtoParser.ParserRule]
) -> List[ProtoParser.Field]:
res: List[ProtoParser.Field] = []
pos = 0
while pos < len(wire_data):
# read field key (tag and wire_type)
offset, key = ProtoParser._read_base128le(wire_data[pos:])
# casting raises exception for invalid WireTypes
wt = ProtoParser.WireTypes((key & 7))
tag = (key >> 3)
pos += offset
def gen_flat_decoded_field_dicts(self) -> Generator[Dict, None, None]: val: Union[bytes, int]
""" preferred_decoding: ProtoParser.DecodedTypes
This generator returns a flattened version of the fields from a message (including nested fields) if wt == ProtoParser.WireTypes.varint:
offset, val = ProtoParser._read_base128le(wire_data[pos:])
A single entry has the form: pos += offset
{ bl = val.bit_length()
"tag": str # fully qualified tag (all tags starting from the root message, concatenated with '.' delimiter) if bl > 64:
"wireType": str # describes the wire encoding used by the field preferred_decoding = ProtoParser.DecodedTypes.unknown
"decoding": str # describes the chosen decoding (interpretation of wire encoding, according to protobuf types) if bl > 32:
"val": Union[bool, str, bytes, int, float] # the decoded value in python representation preferred_decoding = ProtoParser.DecodedTypes.uint64
}
"""
# iterate over fields
for f in self.gen_fields():
# convert field and nested fields to dicts
for d in f.gen_flat_decoded_field_dicts():
yield d
def gen_string_rows(self) -> Generator[Tuple[str, ...], None, None]:
# Excluding fields containing message headers simplifies the view, but without
# knowing the message tags, they can not be used in a custom definition, in order
# to declare a different interpretation for the message (the message is a length-delimeted
# field value, which could alternatively be parsed as 'str' or 'bytes' if the field tag
# is known)
for field_dict in self.gen_flat_decoded_field_dicts():
if self.options.exclude_message_headers and field_dict["decoding"] == "message":
continue
if self.options.include_wiretype:
col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"])
else: else:
col1 = "[{}]".format(field_dict["decoding"]) preferred_decoding = ProtoParser.DecodedTypes.uint32
col2 = field_dict["name"] # empty string if not set (consumes no space) elif wt == ProtoParser.WireTypes.bit_64:
col3 = field_dict["tag"] offset, val = ProtoParser._read_u64(wire_data[pos:])
col4 = str(field_dict["val"]) pos += offset
yield col1, col2, col3, col4 preferred_decoding = ProtoParser.DecodedTypes.fixed64
elif wt == ProtoParser.WireTypes.len_delimited:
offset, length = ProtoParser._read_base128le(wire_data[pos:])
pos += offset
if length > len(wire_data[pos:]):
raise ValueError("length delimited field exceeds data size")
val = wire_data[pos:pos + length]
pos += length
preferred_decoding = ProtoParser.DecodedTypes.message
elif (
wt == ProtoParser.WireTypes.group_start or
wt == ProtoParser.WireTypes.group_end
):
raise ValueError("deprecated field: {}".format(wt))
elif wt == ProtoParser.WireTypes.bit_32:
offset, val = ProtoParser._read_u32(wire_data[pos:])
pos += offset
preferred_decoding = ProtoParser.DecodedTypes.fixed32
else:
# not reachable as if-else statements contain all possible WireTypes
# wrong types raise Exception during typecasting in `wt = ProtoParser.WireTypes((key & 7))`
raise ValueError("invalid WireType for protobuf messsage field")
field = ProtoParser.Field(
wire_type=wt,
preferred_decoding=preferred_decoding,
options=options,
rules=rules,
tag=tag,
wire_value=val,
parent_field=parent_field
)
res.append(field)
return res
@staticmethod
def read_packed_fields(
packed_field: ProtoParser.Field,
) -> List[ProtoParser.Field]:
if not isinstance(packed_field.wire_value, bytes):
ctx.log(type(packed_field.wire_value))
raise ValueError("can not unpack field with data other than bytes")
wire_data: bytes = packed_field.wire_value
tag: int = packed_field.tag
options: ProtoParser.ParserOptions = packed_field.options
rules: List[ProtoParser.ParserRule] = packed_field.rules
intended_decoding: ProtoParser.DecodedTypes = packed_field.preferred_decoding
# the packed field has to have WireType length delimited, whereas the contained
# individual types have to have a different WireType, which is derived from
# the intended decoding
if (
packed_field.wire_type != ProtoParser.WireTypes.len_delimited or
not isinstance(packed_field.wire_value, bytes)
):
raise ValueError("packed fields have to be embedded in a length delimited message")
# wiretype to read has to be determined from intended decoding
packed_wire_type: ProtoParser.WireTypes
if (
intended_decoding == ProtoParser.DecodedTypes.int32 or
intended_decoding == ProtoParser.DecodedTypes.int64 or
intended_decoding == ProtoParser.DecodedTypes.uint32 or
intended_decoding == ProtoParser.DecodedTypes.uint64 or
intended_decoding == ProtoParser.DecodedTypes.sint32 or
intended_decoding == ProtoParser.DecodedTypes.sint64 or
intended_decoding == ProtoParser.DecodedTypes.bool or
intended_decoding == ProtoParser.DecodedTypes.enum
):
packed_wire_type = ProtoParser.WireTypes.varint
elif (
intended_decoding == ProtoParser.DecodedTypes.fixed32 or
intended_decoding == ProtoParser.DecodedTypes.sfixed32 or
intended_decoding == ProtoParser.DecodedTypes.float
):
packed_wire_type = ProtoParser.WireTypes.bit_32
elif (
intended_decoding == ProtoParser.DecodedTypes.fixed64 or
intended_decoding == ProtoParser.DecodedTypes.sfixed64 or
intended_decoding == ProtoParser.DecodedTypes.double
):
packed_wire_type = ProtoParser.WireTypes.bit_64
elif (
intended_decoding == ProtoParser.DecodedTypes.string or
intended_decoding == ProtoParser.DecodedTypes.bytes or
intended_decoding == ProtoParser.DecodedTypes.message
):
packed_wire_type = ProtoParser.WireTypes.len_delimited
else:
# should never happen, no test
raise TypeError("Wire type could not be determined from packed decoding type")
res: List[ProtoParser.Field] = []
pos = 0
val: Union[bytes, int]
if packed_wire_type == ProtoParser.WireTypes.varint:
while pos < len(wire_data):
offset, val = ProtoParser._read_base128le(wire_data[pos:])
pos += offset
res.append(ProtoParser.Field(
options=options,
preferred_decoding=intended_decoding,
rules=rules,
tag=tag,
wire_type=packed_wire_type,
wire_value=val,
parent_field=packed_field.parent_field,
is_unpacked_children=True
))
elif packed_wire_type == ProtoParser.WireTypes.bit_64:
if len(wire_data) % 8 != 0:
raise ValueError("can not parse as packed bit64")
while pos < len(wire_data):
offset, val = ProtoParser._read_u64(wire_data[pos:])
pos += offset
res.append(ProtoParser.Field(
options=options,
preferred_decoding=intended_decoding,
rules=rules,
tag=tag,
wire_type=packed_wire_type,
wire_value=val,
parent_field=packed_field.parent_field,
is_unpacked_children=True
))
elif packed_wire_type == ProtoParser.WireTypes.len_delimited:
while pos < len(wire_data):
offset, length = ProtoParser._read_base128le(wire_data[pos:])
pos += offset
val = wire_data[pos: pos + length]
if length > len(wire_data[pos:]):
raise ValueError("packed length delimited field exceeds data size")
res.append(ProtoParser.Field(
options=options,
preferred_decoding=intended_decoding,
rules=rules,
tag=tag,
wire_type=packed_wire_type,
wire_value=val,
parent_field=packed_field.parent_field,
is_unpacked_children=True
))
pos += length
elif (
packed_wire_type == ProtoParser.WireTypes.group_start or
packed_wire_type == ProtoParser.WireTypes.group_end
):
raise ValueError("group tags can not be encoded packed")
elif packed_wire_type == ProtoParser.WireTypes.bit_32:
if len(wire_data) % 4 != 0:
raise ValueError("can not parse as packed bit32")
while pos < len(wire_data):
offset, val = ProtoParser._read_u32(wire_data[pos:])
pos += offset
res.append(ProtoParser.Field(
options=options,
preferred_decoding=intended_decoding,
rules=rules,
tag=tag,
wire_type=packed_wire_type,
wire_value=val,
parent_field=packed_field.parent_field,
is_unpacked_children=True
))
else:
# should never happen
raise ValueError("invalid WireType for protobuf messsage field")
# mark parent field as packed parent (if we got here, unpacking succeeded)
packed_field.is_packed_parent = True
return res
class Field: class Field:
""" """
@ -301,36 +439,46 @@ class ProtoParser:
def __init__( def __init__(
self, self,
wire_type: GoogleProtobuf.Pair.WireTypes, wire_type: ProtoParser.WireTypes,
preferred_decoding: ProtoParser.DecodedTypes, preferred_decoding: ProtoParser.DecodedTypes,
tag: int, tag: int,
parent_field: Optional[ProtoParser.Field],
wire_value: Union[int, bytes], wire_value: Union[int, bytes],
owning_message: ProtoParser.Message,
options: ProtoParser.ParserOptions, options: ProtoParser.ParserOptions,
rules: List[ProtoParser.ParserRule] rules: List[ProtoParser.ParserRule],
is_unpacked_children: bool = False
) -> None: ) -> None:
self.wire_type: GoogleProtobuf.Pair.WireTypes = wire_type self.wire_type: ProtoParser.WireTypes = wire_type
self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding
self.wire_value: Union[int, bytes] = wire_value self.wire_value: Union[int, bytes] = wire_value
self.tag: int = tag self.tag: int = tag
self.owning_message: ProtoParser.Message = owning_message
self.options: ProtoParser.ParserOptions = options self.options: ProtoParser.ParserOptions = options
self.name: str = "" self.name: str = ""
self.rules: List[ProtoParser.ParserRule] = rules self.rules: List[ProtoParser.ParserRule] = rules
self.parent_tags: List[int] self.parent_field: Optional[ProtoParser.Field] = parent_field
if not self.owning_message.parent_field: self.is_unpacked_children: bool = is_unpacked_children # marks field as being a result of unpacking
self.parent_tags = [] self.is_packed_parent: bool = False # marks field as being parent of successfully unpacked children
else: self.parent_tags: List[int] = []
self.parent_tags = self.owning_message.parent_field.parent_tags[:] if self.parent_field is not None:
self.parent_tags.append(self.owning_message.parent_field.tag) self.parent_tags = self.parent_field.parent_tags[:]
self.parent_tags.append(self.parent_field.tag)
self.try_unpack = False
# rules can overwrite self.try_unpack
self.apply_rules() self.apply_rules()
# do not unpack fields which are the result of unpacking
if (
parent_field is not None and
self.is_unpacked_children
):
self.try_unpack = False
# no tests for only_first_hit=False, as not user-changable # no tests for only_first_hit=False, as not user-changable
def apply_rules(self, only_first_hit=True): def apply_rules(self, only_first_hit=True):
tag_str = self._gen_tag_str() tag_str = self._gen_tag_str()
name = None name = None
decoding = None decoding = None
as_packed = False
try: try:
for rule in self.rules: for rule in self.rules:
for fd in rule.field_definitions: for fd in rule.field_definitions:
@ -347,17 +495,21 @@ class ProtoParser:
# only first match # only first match
self.name = fd.name self.name = fd.name
self.preferred_decoding = fd.intended_decoding self.preferred_decoding = fd.intended_decoding
self.try_unpack = fd.as_packed
return return
else: else:
# overwrite matches till last rule was inspected # overwrite matches till last rule was inspected
# (f.e. allows to define name in one rule and intended_decoding in another one) # (f.e. allows to define name in one rule and intended_decoding in another one)
name = fd.name if fd.name else name name = fd.name if fd.name else name
decoding = fd.intended_decoding if fd.intended_decoding else decoding decoding = fd.intended_decoding if fd.intended_decoding else decoding
if fd.as_packed:
as_packed = True
if name: if name:
self.name = name self.name = name
if decoding: if decoding:
self.preferred_decoding = decoding self.preferred_decoding = decoding
self.try_unpack = as_packed
except Exception as e: except Exception as e:
ctx.log.warn(e) ctx.log.warn(e)
pass pass
@ -369,35 +521,36 @@ class ProtoParser:
def safe_decode_as( def safe_decode_as(
self, self,
intended_decoding: ProtoParser.DecodedTypes intended_decoding: ProtoParser.DecodedTypes,
) -> Tuple[ProtoParser.DecodedTypes, Union[bool, float, int, bytes, str, ProtoParser.Message]]: try_as_packed: bool = False
) -> Tuple[ProtoParser.DecodedTypes, Union[bool, float, int, bytes, str, List[ProtoParser.Field]]]:
""" """
Tries to decode as intended, applies failover, if not possible Tries to decode as intended, applies failover, if not possible
Returns selected decoding and decoded value Returns selected decoding and decoded value
""" """
if self.wire_type == GoogleProtobuf.Pair.WireTypes.varint: if self.wire_type == ProtoParser.WireTypes.varint:
try: try:
return intended_decoding, self.decode_as(intended_decoding) return intended_decoding, self.decode_as(intended_decoding, try_as_packed)
except: except:
if int(self.wire_value).bit_length() > 32: if int(self.wire_value).bit_length() > 32:
# ignore the fact that varint could exceed 64bit (would violate the specs) # ignore the fact that varint could exceed 64bit (would violate the specs)
return ProtoParser.DecodedTypes.uint64, self.wire_value return ProtoParser.DecodedTypes.uint64, self.wire_value
else: else:
return ProtoParser.DecodedTypes.uint32, self.wire_value return ProtoParser.DecodedTypes.uint32, self.wire_value
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_64: elif self.wire_type == ProtoParser.WireTypes.bit_64:
try: try:
return intended_decoding, self.decode_as(intended_decoding) return intended_decoding, self.decode_as(intended_decoding, try_as_packed)
except: except:
return ProtoParser.DecodedTypes.fixed64, self.wire_value return ProtoParser.DecodedTypes.fixed64, self.wire_value
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_32: elif self.wire_type == ProtoParser.WireTypes.bit_32:
try: try:
return intended_decoding, self.decode_as(intended_decoding) return intended_decoding, self.decode_as(intended_decoding, try_as_packed)
except: except:
return ProtoParser.DecodedTypes.fixed32, self.wire_value return ProtoParser.DecodedTypes.fixed32, self.wire_value
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.len_delimited: elif self.wire_type == ProtoParser.WireTypes.len_delimited:
try: try:
return intended_decoding, self.decode_as(intended_decoding) return intended_decoding, self.decode_as(intended_decoding, try_as_packed)
except: except:
# failover strategy: message --> string (valid UTF-8) --> bytes # failover strategy: message --> string (valid UTF-8) --> bytes
len_delimited_strategy: List[ProtoParser.DecodedTypes] = [ len_delimited_strategy: List[ProtoParser.DecodedTypes] = [
@ -406,12 +559,12 @@ class ProtoParser:
ProtoParser.DecodedTypes.bytes # should always work ProtoParser.DecodedTypes.bytes # should always work
] ]
for failover_decoding in len_delimited_strategy: for failover_decoding in len_delimited_strategy:
if failover_decoding == intended_decoding: if failover_decoding == intended_decoding and not try_as_packed:
continue # don't try it twice # don't try same decoding twice, unless first attempt was packed
continue
try: try:
return failover_decoding, self.decode_as(failover_decoding) return failover_decoding, self.decode_as(failover_decoding, False)
except: except:
# move on with next
pass pass
# we should never get here (could not be added to tests) # we should never get here (could not be added to tests)
@ -419,19 +572,23 @@ class ProtoParser:
def decode_as( def decode_as(
self, self,
intended_decoding: ProtoParser.DecodedTypes intended_decoding: ProtoParser.DecodedTypes,
) -> Union[bool, int, float, bytes, str, ProtoParser.Message]: as_packed: bool = False
if self.wire_type == GoogleProtobuf.Pair.WireTypes.varint: ) -> Union[bool, int, float, bytes, str, List[ProtoParser.Field]]:
if as_packed is True:
return ProtoParser.read_packed_fields(packed_field=self)
if self.wire_type == ProtoParser.WireTypes.varint:
assert isinstance(self.wire_value, int) assert isinstance(self.wire_value, int)
if intended_decoding == ProtoParser.DecodedTypes.bool: if intended_decoding == ProtoParser.DecodedTypes.bool:
return self.wire_value != 0 # clamp result to 64bit
return self.wire_value & 0xffffffffffffffff != 0
elif intended_decoding == ProtoParser.DecodedTypes.int32: elif intended_decoding == ProtoParser.DecodedTypes.int32:
if self.wire_value.bit_length() > 32: if self.wire_value.bit_length() > 32:
raise TypeError("wire value too large for int32") raise TypeError("wire value too large for int32")
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] return struct.unpack("!i", struct.pack("!I", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.int64: elif intended_decoding == ProtoParser.DecodedTypes.int64:
if self.wire_value.bit_length() > 64: if self.wire_value.bit_length() > 64:
# currently avoided by kaitai decoder (can not be added to tests)
raise TypeError("wire value too large for int64") raise TypeError("wire value too large for int64")
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.uint32: elif intended_decoding == ProtoParser.DecodedTypes.uint32:
@ -443,7 +600,6 @@ class ProtoParser:
intended_decoding == ProtoParser.DecodedTypes.enum intended_decoding == ProtoParser.DecodedTypes.enum
): ):
if self.wire_value.bit_length() > 64: if self.wire_value.bit_length() > 64:
# currently avoided by kaitai decoder (can not be added to tests)
raise TypeError("wire value too large") raise TypeError("wire value too large")
return self.wire_value # already 'int' which was parsed as unsigned return self.wire_value # already 'int' which was parsed as unsigned
elif intended_decoding == ProtoParser.DecodedTypes.sint32: elif intended_decoding == ProtoParser.DecodedTypes.sint32:
@ -452,7 +608,6 @@ class ProtoParser:
return (self.wire_value >> 1) ^ -(self.wire_value & 1) # zigzag_decode return (self.wire_value >> 1) ^ -(self.wire_value & 1) # zigzag_decode
elif intended_decoding == ProtoParser.DecodedTypes.sint64: elif intended_decoding == ProtoParser.DecodedTypes.sint64:
if self.wire_value.bit_length() > 64: if self.wire_value.bit_length() > 64:
# currently avoided by kaitai decoder (can not be added to tests)
raise TypeError("wire value too large for sint64") raise TypeError("wire value too large for sint64")
# ZigZag decode # ZigZag decode
# Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba # Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba
@ -463,37 +618,35 @@ class ProtoParser:
): ):
# special case, not complying to protobuf specs # special case, not complying to protobuf specs
return self._wire_value_as_float() return self._wire_value_as_float()
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_64: elif self.wire_type == ProtoParser.WireTypes.bit_64:
if intended_decoding == ProtoParser.DecodedTypes.fixed64: if intended_decoding == ProtoParser.DecodedTypes.fixed64:
return self.wire_value return self.wire_value
elif intended_decoding == ProtoParser.DecodedTypes.sfixed64: elif intended_decoding == ProtoParser.DecodedTypes.sfixed64:
return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.double: elif intended_decoding == ProtoParser.DecodedTypes.double:
return self._wire_value_as_float() return self._wire_value_as_float()
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_32: elif self.wire_type == ProtoParser.WireTypes.bit_32:
if intended_decoding == ProtoParser.DecodedTypes.fixed32: if intended_decoding == ProtoParser.DecodedTypes.fixed32:
return self.wire_value return self.wire_value
elif intended_decoding == ProtoParser.DecodedTypes.sfixed32: elif intended_decoding == ProtoParser.DecodedTypes.sfixed32:
return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] return struct.unpack("!i", struct.pack("!I", self.wire_value))[0]
elif intended_decoding == ProtoParser.DecodedTypes.float: elif intended_decoding == ProtoParser.DecodedTypes.float:
return self._wire_value_as_float() return self._wire_value_as_float()
elif self.wire_type == GoogleProtobuf.Pair.WireTypes.len_delimited: elif self.wire_type == ProtoParser.WireTypes.len_delimited:
assert isinstance(self.wire_value, bytes) assert isinstance(self.wire_value, bytes)
if intended_decoding == ProtoParser.DecodedTypes.string: if intended_decoding == ProtoParser.DecodedTypes.string:
# According to specs, a protobuf string HAS TO be UTF-8 parsable # According to specs, a protobuf string HAS TO be UTF-8 parsable
# throw exception on invalid UTF-8 chars, but escape linebreaks # throw exception on invalid UTF-8 chars, but escape linebreaks
return self.wire_value_as_utf8(escape_invalid=False, escape_newline=True) return self.wire_value_as_utf8(escape_newline=True)
elif intended_decoding == ProtoParser.DecodedTypes.bytes: elif intended_decoding == ProtoParser.DecodedTypes.bytes:
# always works, assure to hand back a copy # always works, assure to hand back a copy
return self.wire_value[:] return self.wire_value[:]
elif intended_decoding == ProtoParser.DecodedTypes.packed_repeated_field:
raise NotImplementedError("currently not needed")
elif intended_decoding == ProtoParser.DecodedTypes.message: elif intended_decoding == ProtoParser.DecodedTypes.message:
return ProtoParser.Message( return ProtoParser.read_fields(
data=self.wire_value, wire_data=self.wire_value,
options=self.options,
parent_field=self, parent_field=self,
rules=self.rules options=self.options,
rules=self.rules,
) )
# if here, there is no valid decoding # if here, there is no valid decoding
@ -533,7 +686,7 @@ class ProtoParser:
if self.wire_value.bit_length() > 64: if self.wire_value.bit_length() > 64:
# source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit # source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit
# currently avoided by kaitai decoder (can not be added to tests) # currently avoided by kaitai decoder (can not be added to tests)
raise ValueError("Value exceeds 64bit, violating protobuf specs") raise ValueError("value exceeds 64bit, violating protobuf specs")
elif self.wire_value.bit_length() > 32: elif self.wire_value.bit_length() > 32:
# packing uses network byte order (to assure consistent results across architectures) # packing uses network byte order (to assure consistent results across architectures)
return struct.pack("!Q", self.wire_value) return struct.pack("!Q", self.wire_value)
@ -550,12 +703,9 @@ class ProtoParser:
def _decoding_str(self, decoding: ProtoParser.DecodedTypes): def _decoding_str(self, decoding: ProtoParser.DecodedTypes):
return str(decoding).split(".")[-1] return str(decoding).split(".")[-1]
def wire_value_as_utf8(self, escape_invalid=True, escape_newline=True) -> str: def wire_value_as_utf8(self, escape_newline=True) -> str:
if isinstance(self.wire_value, bytes): if isinstance(self.wire_value, bytes):
if escape_invalid: res = self.wire_value.decode("utf-8")
res = self.wire_value.decode("utf-8", "backslashreplace")
else:
res = self.wire_value.decode("utf-8")
return res.replace("\n", "\\n") if escape_newline else res return res.replace("\n", "\\n") if escape_newline else res
return str(self.wire_value) return str(self.wire_value)
@ -568,18 +718,23 @@ class ProtoParser:
If the field holds a nested message, the fields contained in the message are appended. If the field holds a nested message, the fields contained in the message are appended.
Ultimately this flattens all fields recursively. Ultimately this flattens all fields recursively.
""" """
selected_decoding, decoded_val = self.safe_decode_as(self.preferred_decoding) selected_decoding, decoded_val = self.safe_decode_as(self.preferred_decoding, self.try_unpack)
field_desc_dict = { field_desc_dict = {
"tag": self._gen_tag_str(), "tag": self._gen_tag_str(),
"wireType": self._wire_type_str(), "wireType": self._wire_type_str(),
"decoding": self._decoding_str(selected_decoding), "decoding": self._decoding_str(selected_decoding),
"name": self.name, "name": self.name,
} }
if isinstance(decoded_val, ProtoParser.Message): if isinstance(decoded_val, list):
field_desc_dict["val"] = "" # message has no value, because contained fields get appended (flattened) if (
yield field_desc_dict selected_decoding == ProtoParser.DecodedTypes.message # field is a message with subfields
# the value is an embedded message, thus add the message fields and not self.is_packed_parent # field is a message, but replaced by packed fields
for f in decoded_val.gen_fields(): ):
# Field is a message, not packed, thus include it as message header
field_desc_dict["val"] = ""
yield field_desc_dict
# add sub-fields of messages or packed fields
for f in decoded_val:
for field_dict in f.gen_flat_decoded_field_dicts(): for field_dict in f.gen_flat_decoded_field_dicts():
yield field_dict yield field_dict
else: else:
@ -599,15 +754,35 @@ class ProtoParser:
if rules is None: if rules is None:
rules = [] rules = []
self.rules = rules self.rules = rules
self.root_message: ProtoParser.Message = ProtoParser.Message(
data=data, try:
options=self.options, self.root_fields: List[ProtoParser.Field] = ProtoParser.read_fields(
rules=self.rules wire_data=self.data,
) options=self.options,
parent_field=None,
rules=self.rules
)
except Exception as e:
raise ValueError("not a valid protobuf message") from e
def gen_flat_decoded_field_dicts(self) -> Generator[Dict, None, None]:
for f in self.root_fields:
for field_dict in f.gen_flat_decoded_field_dicts():
yield field_dict
def gen_str_rows(self) -> Generator[Tuple[str, ...], None, None]: def gen_str_rows(self) -> Generator[Tuple[str, ...], None, None]:
for f in self.root_message.gen_string_rows(): for field_dict in self.gen_flat_decoded_field_dicts():
yield f if self.options.exclude_message_headers and field_dict["decoding"] == "message":
continue
if self.options.include_wiretype:
col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"])
else:
col1 = "[{}]".format(field_dict["decoding"])
col2 = field_dict["name"] # empty string if not set (consumes no space)
col3 = field_dict["tag"]
col4 = str(field_dict["val"])
yield col1, col2, col3, col4
# Note: all content view formating functionality is kept out of the ProtoParser class, to # Note: all content view formating functionality is kept out of the ProtoParser class, to

View File

@ -1,5 +1,6 @@
import pytest import pytest
from typing import List
from mitmproxy.contentviews import grpc from mitmproxy.contentviews import grpc
from mitmproxy.contentviews.grpc import ViewGrpcProtobuf, ViewConfig, ProtoParser, parse_grpc_messages from mitmproxy.contentviews.grpc import ViewGrpcProtobuf, ViewConfig, ProtoParser, parse_grpc_messages
from mitmproxy.net.encoding import encode from mitmproxy.net.encoding import encode
@ -353,7 +354,7 @@ def helper_gen_bits32_msg_field(f_idx: int, f_val: int):
def helper_gen_bits64_msg_field(f_idx: int, f_val: int): def helper_gen_bits64_msg_field(f_idx: int, f_val: int):
# manual encoding of protobuf data # manual encoding of protobuf data
f_wt = 1 # field type 1 (bits32) f_wt = 1 # field type 1 (bits64)
tag = (f_idx << 3) | f_wt # combined tag tag = (f_idx << 3) | f_wt # combined tag
msg = helper_encode_base128le(tag) # add encoded tag to message msg = helper_encode_base128le(tag) # add encoded tag to message
msg = msg + struct.pack("<Q", f_val) # add varint encoded field value msg = msg + struct.pack("<Q", f_val) # add varint encoded field value
@ -370,9 +371,40 @@ def helper_gen_lendel_msg_field(f_idx: int, f_val: bytes):
return msg return msg
def test_special_decoding(): def helper_gen_bits64_msg_field_packed(f_idx: int, values: List[int]):
from mitmproxy.contrib.kaitaistruct.google_protobuf import GoogleProtobuf # manual encoding of protobuf data
msg_inner = b""
for f_val in values:
msg_inner = msg_inner + struct.pack("<Q", f_val) # add bits64 encoded field value
return helper_gen_lendel_msg_field(f_idx, msg_inner)
def helper_gen_bits32_msg_field_packed(f_idx: int, values: List[int]):
# manual encoding of protobuf data
msg_inner = b""
for f_val in values:
msg_inner = msg_inner + struct.pack("<I", f_val) # add bits32 encoded field value
return helper_gen_lendel_msg_field(f_idx, msg_inner)
def helper_gen_varint_msg_field_packed(f_idx: int, values: List[int]):
# manual encoding of protobuf data
msg_inner = b""
for f_val in values:
msg_inner = msg_inner + helper_encode_base128le(f_val) # add varint encoded field value
return helper_gen_lendel_msg_field(f_idx, msg_inner)
def helper_gen_lendel_msg_field_packed(f_idx: int, values: List[bytes]):
# manual encoding of protobuf data
msg_inner = b""
for f_val in values:
msg_inner = msg_inner + helper_encode_base128le(len(f_val)) # add length of message
msg_inner = msg_inner + f_val
return helper_gen_lendel_msg_field(f_idx, msg_inner)
def test_special_decoding():
msg = helper_gen_varint_msg_field(1, 1) # small varint msg = helper_gen_varint_msg_field(1, 1) # small varint
msg += helper_gen_varint_msg_field(2, 1 << 32) # varint > 32bit msg += helper_gen_varint_msg_field(2, 1 << 32) # varint > 32bit
msg += helper_gen_varint_msg_field(3, 1 << 64) # varint > 64bit (returned as 0x0 by Kaitai protobuf decoder) msg += helper_gen_varint_msg_field(3, 1 << 64) # varint > 64bit (returned as 0x0 by Kaitai protobuf decoder)
@ -380,17 +412,15 @@ def test_special_decoding():
msg += helper_gen_bits64_msg_field(5, 0xbff199999999999a) # bits64 msg += helper_gen_bits64_msg_field(5, 0xbff199999999999a) # bits64
msg += helper_gen_varint_msg_field(6, 0xffffffff) # 32 bit varint negative msg += helper_gen_varint_msg_field(6, 0xffffffff) # 32 bit varint negative
msg += helper_gen_lendel_msg_field(7, b"hello world") # length delimted message, UTF-8 parsable msg += helper_gen_lendel_msg_field(7, b"hello world") # length delimted message, UTF-8 parsable
msg += helper_gen_varint_msg_field(8, 1 << 128) # oversized varint
parser = ProtoParser.Message( parser = ProtoParser(
data=msg, data=msg,
parent_field=[], parser_options=ProtoParser.ParserOptions(),
rules=[], rules=[]
options=ProtoParser.ParserOptions()
) )
# print(list(parser.gen_string_rows()))
# return
fields = list(parser.gen_fields()) fields = parser.root_fields
assert fields[0].wire_value == 1 assert fields[0].wire_value == 1
assert fields[1].wire_value == 1 << 32 assert fields[1].wire_value == 1 << 32
as_bool = fields[1].decode_as(ProtoParser.DecodedTypes.bool) as_bool = fields[1].decode_as(ProtoParser.DecodedTypes.bool)
@ -402,14 +432,16 @@ def test_special_decoding():
assert fields[1].decode_as(ProtoParser.DecodedTypes.float) == 2.121995791e-314 assert fields[1].decode_as(ProtoParser.DecodedTypes.float) == 2.121995791e-314
assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.uint32) == (ProtoParser.DecodedTypes.uint64, 1 << 32) assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.uint32) == (ProtoParser.DecodedTypes.uint64, 1 << 32)
assert fields[0].safe_decode_as(ProtoParser.DecodedTypes.sfixed32) == (ProtoParser.DecodedTypes.uint32, 1) assert fields[0].safe_decode_as(ProtoParser.DecodedTypes.sfixed32) == (ProtoParser.DecodedTypes.uint32, 1)
assert fields[3].wire_type == GoogleProtobuf.Pair.WireTypes.bit_32 assert fields[3].wire_type == ProtoParser.WireTypes.bit_32
assert fields[4].wire_type == GoogleProtobuf.Pair.WireTypes.bit_64 assert fields[4].wire_type == ProtoParser.WireTypes.bit_64
# signed 32 bit int (standard encoding) # signed 32 bit int (standard encoding)
assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.int32) == (ProtoParser.DecodedTypes.int32, -1) assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.int32) == (ProtoParser.DecodedTypes.int32, -1)
# fixed (signed) 32bit int (ZigZag encoding) # fixed (signed) 32bit int (ZigZag encoding)
assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.sint32) == (ProtoParser.DecodedTypes.sint32, -2147483648) assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.sint32) == (ProtoParser.DecodedTypes.sint32, -2147483648)
# sint64 # sint64
assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.sint64) == (ProtoParser.DecodedTypes.sint64, 2147483648) assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.sint64) == (ProtoParser.DecodedTypes.sint64, 2147483648)
# int64
assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.int64) == (ProtoParser.DecodedTypes.int64, 4294967296)
# varint 64bit to enum # varint 64bit to enum
assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.enum) == (ProtoParser.DecodedTypes.enum, 4294967296) assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.enum) == (ProtoParser.DecodedTypes.enum, 4294967296)
@ -449,8 +481,308 @@ def test_special_decoding():
fields[1].decode_as(ProtoParser.DecodedTypes.uint32) fields[1].decode_as(ProtoParser.DecodedTypes.uint32)
with pytest.raises(TypeError, match="can not be converted to floatingpoint representation"): with pytest.raises(TypeError, match="can not be converted to floatingpoint representation"):
fields[6]._wire_value_as_float() fields[6]._wire_value_as_float()
with pytest.raises(TypeError, match="wire value too large for int64"):
fields[7].decode_as(ProtoParser.DecodedTypes.int64)
with pytest.raises(TypeError, match="wire value too large"):
fields[7].decode_as(ProtoParser.DecodedTypes.uint64)
with pytest.raises(TypeError, match="wire value too large for sint64"):
fields[7].decode_as(ProtoParser.DecodedTypes.sint64)
with pytest.raises(ValueError, match="varint exceeds bounds of provided data"):
ProtoParser.read_fields(
wire_data=helper_encode_base128le(1 << 128),
options=ProtoParser.ParserOptions(),
parent_field=None,
rules=[]
)
with pytest.raises(ValueError, match="value exceeds 64bit, violating protobuf specs"):
fields = ProtoParser.read_fields(
wire_data=helper_gen_varint_msg_field(1, 1 << 128),
options=ProtoParser.ParserOptions(),
parent_field=None,
rules=[]
)
fields[0]._value_as_bytes()
with pytest.raises(ValueError, match=".* is not a valid .*WireTypes"):
ProtoParser.read_fields(
wire_data=helper_encode_base128le(0x7), # invalid wiretype 0x7
options=ProtoParser.ParserOptions(),
parent_field=None,
rules=[]
)
print(fields[6])
def test_view_protobuf_custom_config_packed(tdata):
# message with repeated field fixed64
msg_inner1 = helper_gen_bits64_msg_field(2, 12)
msg_inner1 += helper_gen_bits64_msg_field(2, 23)
msg_inner1 += helper_gen_bits64_msg_field(2, 456789012345678)
msg1 = helper_gen_lendel_msg_field(1, msg_inner1)
v = full_eval(ViewGrpcProtobuf())
view_text, output = v(msg1)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '12 ')],
[('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '23 ')],
[('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '456789012345678 ')]
]
# same message as above, but fixed64 values are packed
# Note: the decoded has no type indication, as packed values are always contained in
# a length delimited field. The packed fields contain no individual type header
# decoder has no knowledge of packed repeated field
msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678])
msg2 = helper_gen_lendel_msg_field(1, msg_inner2)
view_text, output = v(msg2)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], # noqa: E501
[('text', '[bytes] '), ('text', ' '), ('text', '1.2 '), ('text', "b'\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00Ns\\xd1zr\\x9f\\x01\\x00' ")] # noqa: E501
]
# decoder uses custom definition to decode as 1.2 as "packed, repeated fixed64"
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated fixed64",
tag="1.2",
intended_decoding=ProtoParser.DecodedTypes.fixed64,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678])
msg2 = helper_gen_lendel_msg_field(1, msg_inner2)
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg2, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '12 ')],
[('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '23 ')],
[('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '456789012345678 ')]
]
# message with packed repeated messages in field 1.5
# Note: protobuf v3 only allows packed encoding for scalar field types, but packed messages
# were spotted in traffic to google gRPC endpoints (f.e. https://play.googleapis.com/log/batch)
p_msg1 = helper_gen_lendel_msg_field(1, b"inner message 1")
p_msg1 += helper_gen_varint_msg_field(2, 1)
p_msg2 = helper_gen_lendel_msg_field(1, b"inner message 2")
p_msg2 += helper_gen_varint_msg_field(2, 2)
p_msg3 = helper_gen_lendel_msg_field(1, b"inner message 3")
p_msg3 += helper_gen_varint_msg_field(2, 3)
msg_inner3 = helper_gen_lendel_msg_field_packed(5, [p_msg1, p_msg2, p_msg3])
msg3 = helper_gen_lendel_msg_field(1, msg_inner3)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated message",
tag="1.5",
intended_decoding=ProtoParser.DecodedTypes.message,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg3, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')]
]
# message with repeated messages in field 1.5 (not packed), has to be detected by failover parsing
msg_inner4 = helper_gen_lendel_msg_field(5, p_msg1)
msg_inner4 += helper_gen_lendel_msg_field(5, p_msg2)
msg_inner4 += helper_gen_lendel_msg_field(5, p_msg3)
msg4 = helper_gen_lendel_msg_field(1, msg_inner4)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated message",
tag="1.5",
intended_decoding=ProtoParser.DecodedTypes.message,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg4, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')],
[('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')],
[('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')],
[('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')]
]
# packed bit32
msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890])
msg = helper_gen_lendel_msg_field(1, msg_inner)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated fixed32",
tag="1.2",
intended_decoding=ProtoParser.DecodedTypes.fixed32,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '12 ')],
[('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '23 ')],
[('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '4567890 ')]
]
# packed bit32, invalid
msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 4
msg = helper_gen_lendel_msg_field(1, msg_inner)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated fixed32",
tag="1.2",
intended_decoding=ProtoParser.DecodedTypes.fixed32,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x0c\\x0c\\x00\\x00\\x00\\x17\\x00\\x00\\x00R\\xb3E\\x00\\x01' ")] # noqa: E501
]
# packed bit64, invalid
msg_inner = helper_gen_bits64_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 8
msg = helper_gen_lendel_msg_field(1, msg_inner)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated fixed64",
tag="1.2",
intended_decoding=ProtoParser.DecodedTypes.fixed64,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x18\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00R\\xb3E\\x00\\x00\\x00\\x00\\x00\\x01'")] # noqa: E501
]
# packed varint
msg_inner = helper_gen_varint_msg_field_packed(2, [12, 23, 4567890])
msg = helper_gen_lendel_msg_field(1, msg_inner)
view_config = ViewConfig(
parser_options=ProtoParser.ParserOptions(),
parser_rules=[
ProtoParser.ParserRule(
filter=".*",
name="parse packed field",
field_definitions=[
ProtoParser.ParserFieldDefinition(
name="packed repeated varint",
tag="1.2",
intended_decoding=ProtoParser.DecodedTypes.uint32,
as_packed=True
)
]
)
]
)
v = full_eval(ViewGrpcProtobuf(view_config))
# provide the view a flow and response message dummies, to allow custom rules to work
view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response)
assert view_text == "Protobuf (flattened)"
output = list(output) # assure list conversion if generator
assert output == [
[('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')],
[('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '12 ')],
[('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '23 ')],
[('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '4567890 ')]
]
def test_render_priority(): def test_render_priority():