diff --git a/mitmproxy/contentviews/grpc.py b/mitmproxy/contentviews/grpc.py index 4832d6d0b..a28a6821b 100644 --- a/mitmproxy/contentviews/grpc.py +++ b/mitmproxy/contentviews/grpc.py @@ -7,8 +7,6 @@ from typing import Dict, Generator, Iterable, Iterator, List, Optional, Tuple, U from mitmproxy import contentviews, ctx, flow, flowfilter, http from mitmproxy.contentviews import base -from mitmproxy.contrib.kaitaistruct.google_protobuf import GoogleProtobuf -from mitmproxy.contrib.kaitaistruct.vlq_base128_le import VlqBase128Le from mitmproxy.net.encoding import decode @@ -108,6 +106,9 @@ class ProtoParser: name: Optional[str] = None """optional: intended field for visualization (parser fails over to alternate decoding if not possible)""" + as_packed: Optional[bool] = False + """optional: if set to true, the field is considered to be repeated and packed""" + @dataclass class ParserOptions: # output should contain wiretype of fields @@ -143,115 +144,252 @@ class ProtoParser: string = 14 bytes = 15 message = 16 - packed_repeated_field = 17 + # helper - unknown = 18 + unknown = 17 - class Message: - def __init__( - self, - data: bytes, - options: ProtoParser.ParserOptions, - rules: List[ProtoParser.ParserRule], - parent_field: ProtoParser.Field = None, - ) -> None: - self.data: bytes = data - self.parent_field: Optional[ProtoParser.Field] = parent_field - self.options: ProtoParser.ParserOptions = options - self.rules: List[ProtoParser.ParserRule] = rules - try: - self.fields: List[ProtoParser.Field] = self.parse_message_fields(data) - except: - raise ValueError("not a valid protobuf message") + @staticmethod + def _read_base128le(data: bytes) -> Tuple[int, int]: + res = 0 + offset = 0 + while offset < len(data): + o = data[offset] + res += ((o & 0x7f) << (7 * offset)) + offset += 1 + if o < 0x80: + # the Kaitai parser for protobuf support base128 le values up + # to 8 groups (bytes). Due to the nature of the encoding, each + # group attributes 7bit to the resulting value, which give + # a 56 bit value at maximum. + # The values which get encoded into protobuf variable length integers, + # on the other hand, include full 64bit types (int64, uint64, sint64). + # This means, the Kaitai encoder can not cover the full range of + # possible values + # + # This decoder puts no limitation on the maximum value of variable + # length integers. Values exceeding 64bit have to be handled externally + return offset, res + raise ValueError("varint exceeds bounds of provided data") - def parse_message_fields(self, message: bytes) -> List: - res: List[ProtoParser.Field] = [] + @staticmethod + def _read_u32(data: bytes) -> Tuple[int, int]: + return 4, struct.unpack(" 32: - preferred_decoding = ProtoParser.DecodedTypes.uint64 - else: - preferred_decoding = ProtoParser.DecodedTypes.uint32 - if wt == GoogleProtobuf.Pair.WireTypes.bit_64: - # exists in Protobuf for efficient encoding, when decoded comes down to uint64 - assert isinstance(v, int) - preferred_decoding = ProtoParser.DecodedTypes.fixed64 - if wt == GoogleProtobuf.Pair.WireTypes.bit_32: - # exists in Protobuf for efficient encoding, when decoded comes down to uint32 - assert isinstance(v, int) - preferred_decoding = ProtoParser.DecodedTypes.fixed32 + @staticmethod + def _read_u64(data: bytes) -> Tuple[int, int]: + return 8, struct.unpack(" Generator[ProtoParser.Field, None, None]: - for f in self.fields: - yield f + @staticmethod + def read_fields( + wire_data: bytes, + parent_field: Optional[ProtoParser.Field], + options: ProtoParser.ParserOptions, + rules: List[ProtoParser.ParserRule] + ) -> List[ProtoParser.Field]: + res: List[ProtoParser.Field] = [] + pos = 0 + while pos < len(wire_data): + # read field key (tag and wire_type) + offset, key = ProtoParser._read_base128le(wire_data[pos:]) + # casting raises exception for invalid WireTypes + wt = ProtoParser.WireTypes((key & 7)) + tag = (key >> 3) + pos += offset - def gen_flat_decoded_field_dicts(self) -> Generator[Dict, None, None]: - """ - This generator returns a flattened version of the fields from a message (including nested fields) - - A single entry has the form: - { - "tag": str # fully qualified tag (all tags starting from the root message, concatenated with '.' delimiter) - "wireType": str # describes the wire encoding used by the field - "decoding": str # describes the chosen decoding (interpretation of wire encoding, according to protobuf types) - "val": Union[bool, str, bytes, int, float] # the decoded value in python representation - } - """ - # iterate over fields - for f in self.gen_fields(): - # convert field and nested fields to dicts - for d in f.gen_flat_decoded_field_dicts(): - yield d - - def gen_string_rows(self) -> Generator[Tuple[str, ...], None, None]: - # Excluding fields containing message headers simplifies the view, but without - # knowing the message tags, they can not be used in a custom definition, in order - # to declare a different interpretation for the message (the message is a length-delimeted - # field value, which could alternatively be parsed as 'str' or 'bytes' if the field tag - # is known) - for field_dict in self.gen_flat_decoded_field_dicts(): - if self.options.exclude_message_headers and field_dict["decoding"] == "message": - continue - - if self.options.include_wiretype: - col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"]) + val: Union[bytes, int] + preferred_decoding: ProtoParser.DecodedTypes + if wt == ProtoParser.WireTypes.varint: + offset, val = ProtoParser._read_base128le(wire_data[pos:]) + pos += offset + bl = val.bit_length() + if bl > 64: + preferred_decoding = ProtoParser.DecodedTypes.unknown + if bl > 32: + preferred_decoding = ProtoParser.DecodedTypes.uint64 else: - col1 = "[{}]".format(field_dict["decoding"]) - col2 = field_dict["name"] # empty string if not set (consumes no space) - col3 = field_dict["tag"] - col4 = str(field_dict["val"]) - yield col1, col2, col3, col4 + preferred_decoding = ProtoParser.DecodedTypes.uint32 + elif wt == ProtoParser.WireTypes.bit_64: + offset, val = ProtoParser._read_u64(wire_data[pos:]) + pos += offset + preferred_decoding = ProtoParser.DecodedTypes.fixed64 + elif wt == ProtoParser.WireTypes.len_delimited: + offset, length = ProtoParser._read_base128le(wire_data[pos:]) + pos += offset + if length > len(wire_data[pos:]): + raise ValueError("length delimited field exceeds data size") + val = wire_data[pos:pos + length] + pos += length + preferred_decoding = ProtoParser.DecodedTypes.message + elif ( + wt == ProtoParser.WireTypes.group_start or + wt == ProtoParser.WireTypes.group_end + ): + raise ValueError("deprecated field: {}".format(wt)) + elif wt == ProtoParser.WireTypes.bit_32: + offset, val = ProtoParser._read_u32(wire_data[pos:]) + pos += offset + preferred_decoding = ProtoParser.DecodedTypes.fixed32 + else: + # not reachable as if-else statements contain all possible WireTypes + # wrong types raise Exception during typecasting in `wt = ProtoParser.WireTypes((key & 7))` + raise ValueError("invalid WireType for protobuf messsage field") + + field = ProtoParser.Field( + wire_type=wt, + preferred_decoding=preferred_decoding, + options=options, + rules=rules, + tag=tag, + wire_value=val, + parent_field=parent_field + ) + res.append(field) + + return res + + @staticmethod + def read_packed_fields( + packed_field: ProtoParser.Field, + ) -> List[ProtoParser.Field]: + if not isinstance(packed_field.wire_value, bytes): + ctx.log(type(packed_field.wire_value)) + raise ValueError("can not unpack field with data other than bytes") + wire_data: bytes = packed_field.wire_value + tag: int = packed_field.tag + options: ProtoParser.ParserOptions = packed_field.options + rules: List[ProtoParser.ParserRule] = packed_field.rules + intended_decoding: ProtoParser.DecodedTypes = packed_field.preferred_decoding + + # the packed field has to have WireType length delimited, whereas the contained + # individual types have to have a different WireType, which is derived from + # the intended decoding + if ( + packed_field.wire_type != ProtoParser.WireTypes.len_delimited or + not isinstance(packed_field.wire_value, bytes) + ): + raise ValueError("packed fields have to be embedded in a length delimited message") + # wiretype to read has to be determined from intended decoding + packed_wire_type: ProtoParser.WireTypes + if ( + intended_decoding == ProtoParser.DecodedTypes.int32 or + intended_decoding == ProtoParser.DecodedTypes.int64 or + intended_decoding == ProtoParser.DecodedTypes.uint32 or + intended_decoding == ProtoParser.DecodedTypes.uint64 or + intended_decoding == ProtoParser.DecodedTypes.sint32 or + intended_decoding == ProtoParser.DecodedTypes.sint64 or + intended_decoding == ProtoParser.DecodedTypes.bool or + intended_decoding == ProtoParser.DecodedTypes.enum + ): + packed_wire_type = ProtoParser.WireTypes.varint + elif ( + intended_decoding == ProtoParser.DecodedTypes.fixed32 or + intended_decoding == ProtoParser.DecodedTypes.sfixed32 or + intended_decoding == ProtoParser.DecodedTypes.float + ): + packed_wire_type = ProtoParser.WireTypes.bit_32 + elif ( + intended_decoding == ProtoParser.DecodedTypes.fixed64 or + intended_decoding == ProtoParser.DecodedTypes.sfixed64 or + intended_decoding == ProtoParser.DecodedTypes.double + ): + packed_wire_type = ProtoParser.WireTypes.bit_64 + elif ( + intended_decoding == ProtoParser.DecodedTypes.string or + intended_decoding == ProtoParser.DecodedTypes.bytes or + intended_decoding == ProtoParser.DecodedTypes.message + ): + packed_wire_type = ProtoParser.WireTypes.len_delimited + else: + # should never happen, no test + raise TypeError("Wire type could not be determined from packed decoding type") + + res: List[ProtoParser.Field] = [] + pos = 0 + val: Union[bytes, int] + if packed_wire_type == ProtoParser.WireTypes.varint: + while pos < len(wire_data): + offset, val = ProtoParser._read_base128le(wire_data[pos:]) + pos += offset + res.append(ProtoParser.Field( + options=options, + preferred_decoding=intended_decoding, + rules=rules, + tag=tag, + wire_type=packed_wire_type, + wire_value=val, + parent_field=packed_field.parent_field, + is_unpacked_children=True + )) + elif packed_wire_type == ProtoParser.WireTypes.bit_64: + if len(wire_data) % 8 != 0: + raise ValueError("can not parse as packed bit64") + while pos < len(wire_data): + offset, val = ProtoParser._read_u64(wire_data[pos:]) + pos += offset + res.append(ProtoParser.Field( + options=options, + preferred_decoding=intended_decoding, + rules=rules, + tag=tag, + wire_type=packed_wire_type, + wire_value=val, + parent_field=packed_field.parent_field, + is_unpacked_children=True + )) + elif packed_wire_type == ProtoParser.WireTypes.len_delimited: + while pos < len(wire_data): + offset, length = ProtoParser._read_base128le(wire_data[pos:]) + pos += offset + val = wire_data[pos: pos + length] + if length > len(wire_data[pos:]): + raise ValueError("packed length delimited field exceeds data size") + res.append(ProtoParser.Field( + options=options, + preferred_decoding=intended_decoding, + rules=rules, + tag=tag, + wire_type=packed_wire_type, + wire_value=val, + parent_field=packed_field.parent_field, + is_unpacked_children=True + )) + pos += length + elif ( + packed_wire_type == ProtoParser.WireTypes.group_start or + packed_wire_type == ProtoParser.WireTypes.group_end + ): + raise ValueError("group tags can not be encoded packed") + elif packed_wire_type == ProtoParser.WireTypes.bit_32: + if len(wire_data) % 4 != 0: + raise ValueError("can not parse as packed bit32") + while pos < len(wire_data): + offset, val = ProtoParser._read_u32(wire_data[pos:]) + pos += offset + res.append(ProtoParser.Field( + options=options, + preferred_decoding=intended_decoding, + rules=rules, + tag=tag, + wire_type=packed_wire_type, + wire_value=val, + parent_field=packed_field.parent_field, + is_unpacked_children=True + )) + else: + # should never happen + raise ValueError("invalid WireType for protobuf messsage field") + + # mark parent field as packed parent (if we got here, unpacking succeeded) + packed_field.is_packed_parent = True + return res class Field: """ @@ -301,36 +439,46 @@ class ProtoParser: def __init__( self, - wire_type: GoogleProtobuf.Pair.WireTypes, + wire_type: ProtoParser.WireTypes, preferred_decoding: ProtoParser.DecodedTypes, tag: int, + parent_field: Optional[ProtoParser.Field], wire_value: Union[int, bytes], - owning_message: ProtoParser.Message, options: ProtoParser.ParserOptions, - rules: List[ProtoParser.ParserRule] + rules: List[ProtoParser.ParserRule], + is_unpacked_children: bool = False ) -> None: - self.wire_type: GoogleProtobuf.Pair.WireTypes = wire_type + self.wire_type: ProtoParser.WireTypes = wire_type self.preferred_decoding: ProtoParser.DecodedTypes = preferred_decoding self.wire_value: Union[int, bytes] = wire_value self.tag: int = tag - self.owning_message: ProtoParser.Message = owning_message self.options: ProtoParser.ParserOptions = options self.name: str = "" self.rules: List[ProtoParser.ParserRule] = rules - self.parent_tags: List[int] - if not self.owning_message.parent_field: - self.parent_tags = [] - else: - self.parent_tags = self.owning_message.parent_field.parent_tags[:] - self.parent_tags.append(self.owning_message.parent_field.tag) + self.parent_field: Optional[ProtoParser.Field] = parent_field + self.is_unpacked_children: bool = is_unpacked_children # marks field as being a result of unpacking + self.is_packed_parent: bool = False # marks field as being parent of successfully unpacked children + self.parent_tags: List[int] = [] + if self.parent_field is not None: + self.parent_tags = self.parent_field.parent_tags[:] + self.parent_tags.append(self.parent_field.tag) + self.try_unpack = False + # rules can overwrite self.try_unpack self.apply_rules() + # do not unpack fields which are the result of unpacking + if ( + parent_field is not None and + self.is_unpacked_children + ): + self.try_unpack = False # no tests for only_first_hit=False, as not user-changable def apply_rules(self, only_first_hit=True): tag_str = self._gen_tag_str() name = None decoding = None + as_packed = False try: for rule in self.rules: for fd in rule.field_definitions: @@ -347,17 +495,21 @@ class ProtoParser: # only first match self.name = fd.name self.preferred_decoding = fd.intended_decoding + self.try_unpack = fd.as_packed return else: # overwrite matches till last rule was inspected # (f.e. allows to define name in one rule and intended_decoding in another one) name = fd.name if fd.name else name decoding = fd.intended_decoding if fd.intended_decoding else decoding + if fd.as_packed: + as_packed = True if name: self.name = name if decoding: self.preferred_decoding = decoding + self.try_unpack = as_packed except Exception as e: ctx.log.warn(e) pass @@ -369,35 +521,36 @@ class ProtoParser: def safe_decode_as( self, - intended_decoding: ProtoParser.DecodedTypes - ) -> Tuple[ProtoParser.DecodedTypes, Union[bool, float, int, bytes, str, ProtoParser.Message]]: + intended_decoding: ProtoParser.DecodedTypes, + try_as_packed: bool = False + ) -> Tuple[ProtoParser.DecodedTypes, Union[bool, float, int, bytes, str, List[ProtoParser.Field]]]: """ Tries to decode as intended, applies failover, if not possible Returns selected decoding and decoded value """ - if self.wire_type == GoogleProtobuf.Pair.WireTypes.varint: + if self.wire_type == ProtoParser.WireTypes.varint: try: - return intended_decoding, self.decode_as(intended_decoding) + return intended_decoding, self.decode_as(intended_decoding, try_as_packed) except: if int(self.wire_value).bit_length() > 32: # ignore the fact that varint could exceed 64bit (would violate the specs) return ProtoParser.DecodedTypes.uint64, self.wire_value else: return ProtoParser.DecodedTypes.uint32, self.wire_value - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_64: + elif self.wire_type == ProtoParser.WireTypes.bit_64: try: - return intended_decoding, self.decode_as(intended_decoding) + return intended_decoding, self.decode_as(intended_decoding, try_as_packed) except: return ProtoParser.DecodedTypes.fixed64, self.wire_value - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_32: + elif self.wire_type == ProtoParser.WireTypes.bit_32: try: - return intended_decoding, self.decode_as(intended_decoding) + return intended_decoding, self.decode_as(intended_decoding, try_as_packed) except: return ProtoParser.DecodedTypes.fixed32, self.wire_value - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.len_delimited: + elif self.wire_type == ProtoParser.WireTypes.len_delimited: try: - return intended_decoding, self.decode_as(intended_decoding) + return intended_decoding, self.decode_as(intended_decoding, try_as_packed) except: # failover strategy: message --> string (valid UTF-8) --> bytes len_delimited_strategy: List[ProtoParser.DecodedTypes] = [ @@ -406,12 +559,12 @@ class ProtoParser: ProtoParser.DecodedTypes.bytes # should always work ] for failover_decoding in len_delimited_strategy: - if failover_decoding == intended_decoding: - continue # don't try it twice + if failover_decoding == intended_decoding and not try_as_packed: + # don't try same decoding twice, unless first attempt was packed + continue try: - return failover_decoding, self.decode_as(failover_decoding) + return failover_decoding, self.decode_as(failover_decoding, False) except: - # move on with next pass # we should never get here (could not be added to tests) @@ -419,19 +572,23 @@ class ProtoParser: def decode_as( self, - intended_decoding: ProtoParser.DecodedTypes - ) -> Union[bool, int, float, bytes, str, ProtoParser.Message]: - if self.wire_type == GoogleProtobuf.Pair.WireTypes.varint: + intended_decoding: ProtoParser.DecodedTypes, + as_packed: bool = False + ) -> Union[bool, int, float, bytes, str, List[ProtoParser.Field]]: + if as_packed is True: + return ProtoParser.read_packed_fields(packed_field=self) + + if self.wire_type == ProtoParser.WireTypes.varint: assert isinstance(self.wire_value, int) if intended_decoding == ProtoParser.DecodedTypes.bool: - return self.wire_value != 0 + # clamp result to 64bit + return self.wire_value & 0xffffffffffffffff != 0 elif intended_decoding == ProtoParser.DecodedTypes.int32: if self.wire_value.bit_length() > 32: raise TypeError("wire value too large for int32") return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] elif intended_decoding == ProtoParser.DecodedTypes.int64: if self.wire_value.bit_length() > 64: - # currently avoided by kaitai decoder (can not be added to tests) raise TypeError("wire value too large for int64") return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] elif intended_decoding == ProtoParser.DecodedTypes.uint32: @@ -443,7 +600,6 @@ class ProtoParser: intended_decoding == ProtoParser.DecodedTypes.enum ): if self.wire_value.bit_length() > 64: - # currently avoided by kaitai decoder (can not be added to tests) raise TypeError("wire value too large") return self.wire_value # already 'int' which was parsed as unsigned elif intended_decoding == ProtoParser.DecodedTypes.sint32: @@ -452,7 +608,6 @@ class ProtoParser: return (self.wire_value >> 1) ^ -(self.wire_value & 1) # zigzag_decode elif intended_decoding == ProtoParser.DecodedTypes.sint64: if self.wire_value.bit_length() > 64: - # currently avoided by kaitai decoder (can not be added to tests) raise TypeError("wire value too large for sint64") # ZigZag decode # Ref: https://gist.github.com/mfuerstenau/ba870a29e16536fdbaba @@ -463,37 +618,35 @@ class ProtoParser: ): # special case, not complying to protobuf specs return self._wire_value_as_float() - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_64: + elif self.wire_type == ProtoParser.WireTypes.bit_64: if intended_decoding == ProtoParser.DecodedTypes.fixed64: return self.wire_value elif intended_decoding == ProtoParser.DecodedTypes.sfixed64: return struct.unpack("!q", struct.pack("!Q", self.wire_value))[0] elif intended_decoding == ProtoParser.DecodedTypes.double: return self._wire_value_as_float() - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.bit_32: + elif self.wire_type == ProtoParser.WireTypes.bit_32: if intended_decoding == ProtoParser.DecodedTypes.fixed32: return self.wire_value elif intended_decoding == ProtoParser.DecodedTypes.sfixed32: return struct.unpack("!i", struct.pack("!I", self.wire_value))[0] elif intended_decoding == ProtoParser.DecodedTypes.float: return self._wire_value_as_float() - elif self.wire_type == GoogleProtobuf.Pair.WireTypes.len_delimited: + elif self.wire_type == ProtoParser.WireTypes.len_delimited: assert isinstance(self.wire_value, bytes) if intended_decoding == ProtoParser.DecodedTypes.string: # According to specs, a protobuf string HAS TO be UTF-8 parsable # throw exception on invalid UTF-8 chars, but escape linebreaks - return self.wire_value_as_utf8(escape_invalid=False, escape_newline=True) + return self.wire_value_as_utf8(escape_newline=True) elif intended_decoding == ProtoParser.DecodedTypes.bytes: # always works, assure to hand back a copy return self.wire_value[:] - elif intended_decoding == ProtoParser.DecodedTypes.packed_repeated_field: - raise NotImplementedError("currently not needed") elif intended_decoding == ProtoParser.DecodedTypes.message: - return ProtoParser.Message( - data=self.wire_value, - options=self.options, + return ProtoParser.read_fields( + wire_data=self.wire_value, parent_field=self, - rules=self.rules + options=self.options, + rules=self.rules, ) # if here, there is no valid decoding @@ -533,7 +686,7 @@ class ProtoParser: if self.wire_value.bit_length() > 64: # source for a python int are wiretypes varint/bit_32/bit64 and should never convert to int values 64bit # currently avoided by kaitai decoder (can not be added to tests) - raise ValueError("Value exceeds 64bit, violating protobuf specs") + raise ValueError("value exceeds 64bit, violating protobuf specs") elif self.wire_value.bit_length() > 32: # packing uses network byte order (to assure consistent results across architectures) return struct.pack("!Q", self.wire_value) @@ -550,12 +703,9 @@ class ProtoParser: def _decoding_str(self, decoding: ProtoParser.DecodedTypes): return str(decoding).split(".")[-1] - def wire_value_as_utf8(self, escape_invalid=True, escape_newline=True) -> str: + def wire_value_as_utf8(self, escape_newline=True) -> str: if isinstance(self.wire_value, bytes): - if escape_invalid: - res = self.wire_value.decode("utf-8", "backslashreplace") - else: - res = self.wire_value.decode("utf-8") + res = self.wire_value.decode("utf-8") return res.replace("\n", "\\n") if escape_newline else res return str(self.wire_value) @@ -568,18 +718,23 @@ class ProtoParser: If the field holds a nested message, the fields contained in the message are appended. Ultimately this flattens all fields recursively. """ - selected_decoding, decoded_val = self.safe_decode_as(self.preferred_decoding) + selected_decoding, decoded_val = self.safe_decode_as(self.preferred_decoding, self.try_unpack) field_desc_dict = { "tag": self._gen_tag_str(), "wireType": self._wire_type_str(), "decoding": self._decoding_str(selected_decoding), "name": self.name, } - if isinstance(decoded_val, ProtoParser.Message): - field_desc_dict["val"] = "" # message has no value, because contained fields get appended (flattened) - yield field_desc_dict - # the value is an embedded message, thus add the message fields - for f in decoded_val.gen_fields(): + if isinstance(decoded_val, list): + if ( + selected_decoding == ProtoParser.DecodedTypes.message # field is a message with subfields + and not self.is_packed_parent # field is a message, but replaced by packed fields + ): + # Field is a message, not packed, thus include it as message header + field_desc_dict["val"] = "" + yield field_desc_dict + # add sub-fields of messages or packed fields + for f in decoded_val: for field_dict in f.gen_flat_decoded_field_dicts(): yield field_dict else: @@ -599,15 +754,35 @@ class ProtoParser: if rules is None: rules = [] self.rules = rules - self.root_message: ProtoParser.Message = ProtoParser.Message( - data=data, - options=self.options, - rules=self.rules - ) + + try: + self.root_fields: List[ProtoParser.Field] = ProtoParser.read_fields( + wire_data=self.data, + options=self.options, + parent_field=None, + rules=self.rules + ) + except Exception as e: + raise ValueError("not a valid protobuf message") from e + + def gen_flat_decoded_field_dicts(self) -> Generator[Dict, None, None]: + for f in self.root_fields: + for field_dict in f.gen_flat_decoded_field_dicts(): + yield field_dict def gen_str_rows(self) -> Generator[Tuple[str, ...], None, None]: - for f in self.root_message.gen_string_rows(): - yield f + for field_dict in self.gen_flat_decoded_field_dicts(): + if self.options.exclude_message_headers and field_dict["decoding"] == "message": + continue + + if self.options.include_wiretype: + col1 = "[{}->{}]".format(field_dict["wireType"], field_dict["decoding"]) + else: + col1 = "[{}]".format(field_dict["decoding"]) + col2 = field_dict["name"] # empty string if not set (consumes no space) + col3 = field_dict["tag"] + col4 = str(field_dict["val"]) + yield col1, col2, col3, col4 # Note: all content view formating functionality is kept out of the ProtoParser class, to diff --git a/test/mitmproxy/contentviews/test_grpc.py b/test/mitmproxy/contentviews/test_grpc.py index a0fc963db..cde774a7f 100644 --- a/test/mitmproxy/contentviews/test_grpc.py +++ b/test/mitmproxy/contentviews/test_grpc.py @@ -1,5 +1,6 @@ import pytest +from typing import List from mitmproxy.contentviews import grpc from mitmproxy.contentviews.grpc import ViewGrpcProtobuf, ViewConfig, ProtoParser, parse_grpc_messages from mitmproxy.net.encoding import encode @@ -353,7 +354,7 @@ def helper_gen_bits32_msg_field(f_idx: int, f_val: int): def helper_gen_bits64_msg_field(f_idx: int, f_val: int): # manual encoding of protobuf data - f_wt = 1 # field type 1 (bits32) + f_wt = 1 # field type 1 (bits64) tag = (f_idx << 3) | f_wt # combined tag msg = helper_encode_base128le(tag) # add encoded tag to message msg = msg + struct.pack(" 32bit msg += helper_gen_varint_msg_field(3, 1 << 64) # varint > 64bit (returned as 0x0 by Kaitai protobuf decoder) @@ -380,17 +412,15 @@ def test_special_decoding(): msg += helper_gen_bits64_msg_field(5, 0xbff199999999999a) # bits64 msg += helper_gen_varint_msg_field(6, 0xffffffff) # 32 bit varint negative msg += helper_gen_lendel_msg_field(7, b"hello world") # length delimted message, UTF-8 parsable + msg += helper_gen_varint_msg_field(8, 1 << 128) # oversized varint - parser = ProtoParser.Message( + parser = ProtoParser( data=msg, - parent_field=[], - rules=[], - options=ProtoParser.ParserOptions() + parser_options=ProtoParser.ParserOptions(), + rules=[] ) - # print(list(parser.gen_string_rows())) - # return - fields = list(parser.gen_fields()) + fields = parser.root_fields assert fields[0].wire_value == 1 assert fields[1].wire_value == 1 << 32 as_bool = fields[1].decode_as(ProtoParser.DecodedTypes.bool) @@ -402,14 +432,16 @@ def test_special_decoding(): assert fields[1].decode_as(ProtoParser.DecodedTypes.float) == 2.121995791e-314 assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.uint32) == (ProtoParser.DecodedTypes.uint64, 1 << 32) assert fields[0].safe_decode_as(ProtoParser.DecodedTypes.sfixed32) == (ProtoParser.DecodedTypes.uint32, 1) - assert fields[3].wire_type == GoogleProtobuf.Pair.WireTypes.bit_32 - assert fields[4].wire_type == GoogleProtobuf.Pair.WireTypes.bit_64 + assert fields[3].wire_type == ProtoParser.WireTypes.bit_32 + assert fields[4].wire_type == ProtoParser.WireTypes.bit_64 # signed 32 bit int (standard encoding) assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.int32) == (ProtoParser.DecodedTypes.int32, -1) # fixed (signed) 32bit int (ZigZag encoding) assert fields[5].safe_decode_as(ProtoParser.DecodedTypes.sint32) == (ProtoParser.DecodedTypes.sint32, -2147483648) # sint64 assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.sint64) == (ProtoParser.DecodedTypes.sint64, 2147483648) + # int64 + assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.int64) == (ProtoParser.DecodedTypes.int64, 4294967296) # varint 64bit to enum assert fields[1].safe_decode_as(ProtoParser.DecodedTypes.enum) == (ProtoParser.DecodedTypes.enum, 4294967296) @@ -449,8 +481,308 @@ def test_special_decoding(): fields[1].decode_as(ProtoParser.DecodedTypes.uint32) with pytest.raises(TypeError, match="can not be converted to floatingpoint representation"): fields[6]._wire_value_as_float() + with pytest.raises(TypeError, match="wire value too large for int64"): + fields[7].decode_as(ProtoParser.DecodedTypes.int64) + with pytest.raises(TypeError, match="wire value too large"): + fields[7].decode_as(ProtoParser.DecodedTypes.uint64) + with pytest.raises(TypeError, match="wire value too large for sint64"): + fields[7].decode_as(ProtoParser.DecodedTypes.sint64) + with pytest.raises(ValueError, match="varint exceeds bounds of provided data"): + ProtoParser.read_fields( + wire_data=helper_encode_base128le(1 << 128), + options=ProtoParser.ParserOptions(), + parent_field=None, + rules=[] + ) + with pytest.raises(ValueError, match="value exceeds 64bit, violating protobuf specs"): + fields = ProtoParser.read_fields( + wire_data=helper_gen_varint_msg_field(1, 1 << 128), + options=ProtoParser.ParserOptions(), + parent_field=None, + rules=[] + ) + fields[0]._value_as_bytes() + with pytest.raises(ValueError, match=".* is not a valid .*WireTypes"): + ProtoParser.read_fields( + wire_data=helper_encode_base128le(0x7), # invalid wiretype 0x7 + options=ProtoParser.ParserOptions(), + parent_field=None, + rules=[] + ) - print(fields[6]) + +def test_view_protobuf_custom_config_packed(tdata): + # message with repeated field fixed64 + msg_inner1 = helper_gen_bits64_msg_field(2, 12) + msg_inner1 += helper_gen_bits64_msg_field(2, 23) + msg_inner1 += helper_gen_bits64_msg_field(2, 456789012345678) + msg1 = helper_gen_lendel_msg_field(1, msg_inner1) + + v = full_eval(ViewGrpcProtobuf()) + view_text, output = v(msg1) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '12 ')], + [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '23 ')], + [('text', '[fixed64] '), ('text', ' '), ('text', '1.2 '), ('text', '456789012345678 ')] + ] + + # same message as above, but fixed64 values are packed + # Note: the decoded has no type indication, as packed values are always contained in + # a length delimited field. The packed fields contain no individual type header + + # decoder has no knowledge of packed repeated field + msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678]) + msg2 = helper_gen_lendel_msg_field(1, msg_inner2) + view_text, output = v(msg2) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], # noqa: E501 + [('text', '[bytes] '), ('text', ' '), ('text', '1.2 '), ('text', "b'\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00Ns\\xd1zr\\x9f\\x01\\x00' ")] # noqa: E501 + ] + + # decoder uses custom definition to decode as 1.2 as "packed, repeated fixed64" + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated fixed64", + tag="1.2", + intended_decoding=ProtoParser.DecodedTypes.fixed64, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + msg_inner2 = helper_gen_bits64_msg_field_packed(2, [12, 23, 456789012345678]) + msg2 = helper_gen_lendel_msg_field(1, msg_inner2) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg2, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '12 ')], + [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '23 ')], + [('text', '[fixed64] '), ('text', 'packed repeated fixed64 '), ('text', '1.2 '), ('text', '456789012345678 ')] + ] + + # message with packed repeated messages in field 1.5 + # Note: protobuf v3 only allows packed encoding for scalar field types, but packed messages + # were spotted in traffic to google gRPC endpoints (f.e. https://play.googleapis.com/log/batch) + p_msg1 = helper_gen_lendel_msg_field(1, b"inner message 1") + p_msg1 += helper_gen_varint_msg_field(2, 1) + p_msg2 = helper_gen_lendel_msg_field(1, b"inner message 2") + p_msg2 += helper_gen_varint_msg_field(2, 2) + p_msg3 = helper_gen_lendel_msg_field(1, b"inner message 3") + p_msg3 += helper_gen_varint_msg_field(2, 3) + msg_inner3 = helper_gen_lendel_msg_field_packed(5, [p_msg1, p_msg2, p_msg3]) + msg3 = helper_gen_lendel_msg_field(1, msg_inner3) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated message", + tag="1.5", + intended_decoding=ProtoParser.DecodedTypes.message, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg3, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')] + ] + + # message with repeated messages in field 1.5 (not packed), has to be detected by failover parsing + msg_inner4 = helper_gen_lendel_msg_field(5, p_msg1) + msg_inner4 += helper_gen_lendel_msg_field(5, p_msg2) + msg_inner4 += helper_gen_lendel_msg_field(5, p_msg3) + msg4 = helper_gen_lendel_msg_field(1, msg_inner4) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated message", + tag="1.5", + intended_decoding=ProtoParser.DecodedTypes.message, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg4, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 1 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '1 ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 2 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '2 ')], + [('text', '[message] '), ('text', 'packed repeated message '), ('text', '1.5 '), ('text', ' ')], + [('text', '[string] '), ('text', ' '), ('text', '1.5.1 '), ('text', 'inner message 3 ')], + [('text', '[uint32] '), ('text', ' '), ('text', '1.5.2 '), ('text', '3 ')] + ] + + # packed bit32 + msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890]) + msg = helper_gen_lendel_msg_field(1, msg_inner) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated fixed32", + tag="1.2", + intended_decoding=ProtoParser.DecodedTypes.fixed32, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '12 ')], + [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '23 ')], + [('text', '[fixed32] '), ('text', 'packed repeated fixed32 '), ('text', '1.2 '), ('text', '4567890 ')] + ] + + # packed bit32, invalid + msg_inner = helper_gen_bits32_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 4 + msg = helper_gen_lendel_msg_field(1, msg_inner) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated fixed32", + tag="1.2", + intended_decoding=ProtoParser.DecodedTypes.fixed32, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x0c\\x0c\\x00\\x00\\x00\\x17\\x00\\x00\\x00R\\xb3E\\x00\\x01' ")] # noqa: E501 + ] + + # packed bit64, invalid + msg_inner = helper_gen_bits64_msg_field_packed(2, [12, 23, 4567890]) + b"\x01" # data not divisible by 8 + msg = helper_gen_lendel_msg_field(1, msg_inner) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated fixed64", + tag="1.2", + intended_decoding=ProtoParser.DecodedTypes.fixed64, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[bytes] '), ('text', ' '), ('text', '1 '), ('text', "b'\\x12\\x18\\x0c\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x17\\x00\\x00\\x00\\x00\\x00\\x00\\x00R\\xb3E\\x00\\x00\\x00\\x00\\x00\\x01'")] # noqa: E501 + ] + + # packed varint + msg_inner = helper_gen_varint_msg_field_packed(2, [12, 23, 4567890]) + msg = helper_gen_lendel_msg_field(1, msg_inner) + view_config = ViewConfig( + parser_options=ProtoParser.ParserOptions(), + parser_rules=[ + ProtoParser.ParserRule( + filter=".*", + name="parse packed field", + field_definitions=[ + ProtoParser.ParserFieldDefinition( + name="packed repeated varint", + tag="1.2", + intended_decoding=ProtoParser.DecodedTypes.uint32, + as_packed=True + ) + ] + ) + ] + ) + v = full_eval(ViewGrpcProtobuf(view_config)) + # provide the view a flow and response message dummies, to allow custom rules to work + view_text, output = v(msg, flow=sim_flow, http_message=sim_flow.response) + assert view_text == "Protobuf (flattened)" + output = list(output) # assure list conversion if generator + assert output == [ + [('text', '[message] '), ('text', ' '), ('text', '1 '), ('text', ' ')], + [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '12 ')], + [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '23 ')], + [('text', '[uint32] '), ('text', 'packed repeated varint '), ('text', '1.2 '), ('text', '4567890 ')] + ] def test_render_priority():