diff --git a/examples/complex/xss_scanner.py b/examples/complex/xss_scanner.py
new file mode 100755
index 000000000..a0572d5df
--- /dev/null
+++ b/examples/complex/xss_scanner.py
@@ -0,0 +1,407 @@
+"""
+
+ __ __ _____ _____ _____
+ \ \ / // ____/ ____| / ____|
+ \ V /| (___| (___ | (___ ___ __ _ _ __ _ __ ___ _ __
+ > < \___ \\___ \ \___ \ / __/ _` | '_ \| '_ \ / _ \ '__|
+ / . \ ____) |___) | ____) | (_| (_| | | | | | | | __/ |
+ /_/ \_\_____/_____/ |_____/ \___\__,_|_| |_|_| |_|\___|_|
+
+
+This script automatically scans all visited webpages for XSS and SQLi vulnerabilities.
+
+Usage: mitmproxy -s xss_scanner.py
+
+This script scans for vulnerabilities by injecting a fuzzing payload (see PAYLOAD below) into 4 different places
+and examining the HTML to look for XSS and SQLi injection vulnerabilities. The XSS scanning functionality works by
+looking to see whether it is possible to inject HTML based off of of where the payload appears in the page and what
+characters are escaped. In addition, it also looks for any script tags that load javascript from unclaimed domains.
+The SQLi scanning functionality works by using regular expressions to look for errors from a number of different
+common databases. Since it is only looking for errors, it will not find blind SQLi vulnerabilities.
+
+The 4 places it injects the payload into are:
+1. URLs (e.g. https://example.com/ -> https://example.com/PAYLOAD/)
+2. Queries (e.g. https://example.com/index.html?a=b -> https://example.com/index.html?a=PAYLOAD)
+3. Referers (e.g. The referer changes from https://example.com to PAYLOAD)
+4. User Agents (e.g. The UA changes from Chrome to PAYLOAD)
+
+Reports from this script show up in the event log (viewable by pressing e) and formatted like:
+
+===== XSS Found ====
+XSS URL: http://daviddworken.com/vulnerableUA.php
+Injection Point: User Agent
+Suggested Exploit:
+Line: 1029zxcs'd"aoso[sb]po(pc)se;sl/bsl\eq=3847asd
+
+"""
+
+from mitmproxy import ctx
+from socket import gaierror, gethostbyname
+from urllib.parse import urlparse
+import requests
+import re
+from html.parser import HTMLParser
+from mitmproxy import http
+from typing import Dict, Union, Tuple, Optional, List, NamedTuple
+
+# The actual payload is put between a frontWall and a backWall to make it easy
+# to locate the payload with regular expressions
+FRONT_WALL = b"1029zxc"
+BACK_WALL = b"3847asd"
+PAYLOAD = b"""s'd"aoso[sb]po(pc)se;sl/bsl\\eq="""
+FULL_PAYLOAD = FRONT_WALL + PAYLOAD + BACK_WALL
+
+# A XSSData is a named tuple with the following fields:
+# - url -> str
+# - injection_point -> str
+# - exploit -> str
+# - line -> str
+XSSData = NamedTuple('XSSData', [('url', str),
+ ('injection_point', str),
+ ('exploit', str),
+ ('line', str)])
+
+# A SQLiData is named tuple with the following fields:
+# - url -> str
+# - injection_point -> str
+# - regex -> str
+# - dbms -> str
+SQLiData = NamedTuple('SQLiData', [('url', str),
+ ('injection_point', str),
+ ('regex', str),
+ ('dbms', str)])
+
+
+VulnData = Tuple[Optional[XSSData], Optional[SQLiData]]
+Cookies = Dict[str, str]
+
+
+def get_cookies(flow: http.HTTPFlow) -> Cookies:
+ """ Return a dict going from cookie names to cookie values
+ - Note that it includes both the cookies sent in the original request and
+ the cookies sent by the server """
+ return {name: value for name, value in flow.request.cookies.fields}
+
+
+def find_unclaimed_URLs(body: Union[str, bytes], requestUrl: bytes) -> None:
+ """ Look for unclaimed URLs in script tags and log them if found"""
+ class ScriptURLExtractor(HTMLParser):
+ script_URLs = []
+
+ def handle_starttag(self, tag, attrs):
+ if tag == "script" and "src" in [name for name, value in attrs]:
+ for name, value in attrs:
+ if name == "src":
+ self.script_URLs.append(value)
+
+ parser = ScriptURLExtractor()
+ try:
+ parser.feed(body)
+ except TypeError:
+ parser.feed(body.decode('utf-8'))
+ for url in parser.script_URLs:
+ parser = urlparse(url)
+ domain = parser.netloc
+ try:
+ gethostbyname(domain)
+ except gaierror:
+ ctx.log.error("XSS found in %s due to unclaimed URL \"%s\" in script tag." % (requestUrl, url))
+
+
+def test_end_of_URL_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
+ """ Test the given URL for XSS via injection onto the end of the URL and
+ log the XSS if found """
+ parsed_URL = urlparse(request_URL)
+ path = parsed_URL.path
+ if path != "" and path[-1] != "/": # ensure the path ends in a /
+ path += "/"
+ path += FULL_PAYLOAD.decode('utf-8') # the path must be a string while the payload is bytes
+ url = parsed_URL._replace(path=path).geturl()
+ body = requests.get(url, cookies=cookies).text.lower()
+ xss_info = get_XSS_data(body, url, "End of URL")
+ sqli_info = get_SQLi_data(body, original_body, url, "End of URL")
+ return xss_info, sqli_info
+
+
+def test_referer_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
+ """ Test the given URL for XSS via injection into the referer and
+ log the XSS if found """
+ body = requests.get(request_URL, headers={'referer': FULL_PAYLOAD}, cookies=cookies).text.lower()
+ xss_info = get_XSS_data(body, request_URL, "Referer")
+ sqli_info = get_SQLi_data(body, original_body, request_URL, "Referer")
+ return xss_info, sqli_info
+
+
+def test_user_agent_injection(original_body: str, request_URL: str, cookies: Cookies) -> VulnData:
+ """ Test the given URL for XSS via injection into the user agent and
+ log the XSS if found """
+ body = requests.get(request_URL, headers={'User-Agent': FULL_PAYLOAD}, cookies=cookies).text.lower()
+ xss_info = get_XSS_data(body, request_URL, "User Agent")
+ sqli_info = get_SQLi_data(body, original_body, request_URL, "User Agent")
+ return xss_info, sqli_info
+
+
+def test_query_injection(original_body: str, request_URL: str, cookies: Cookies):
+ """ Test the given URL for XSS via injection into URL queries and
+ log the XSS if found """
+ parsed_URL = urlparse(request_URL)
+ query_string = parsed_URL.query
+ # queries is a list of parameters where each parameter is set to the payload
+ queries = [query.split("=")[0] + "=" + FULL_PAYLOAD.decode('utf-8') for query in query_string.split("&")]
+ new_query_string = "&".join(queries)
+ new_URL = parsed_URL._replace(query=new_query_string).geturl()
+ body = requests.get(new_URL, cookies=cookies).text.lower()
+ xss_info = get_XSS_data(body, new_URL, "Query")
+ sqli_info = get_SQLi_data(body, original_body, new_URL, "Query")
+ return xss_info, sqli_info
+
+
+def log_XSS_data(xss_info: Optional[XSSData]) -> None:
+ """ Log information about the given XSS to mitmproxy """
+ # If it is None, then there is no info to log
+ if not xss_info:
+ return
+ ctx.log.error("===== XSS Found ====")
+ ctx.log.error("XSS URL: %s" % xss_info.url)
+ ctx.log.error("Injection Point: %s" % xss_info.injection_point)
+ ctx.log.error("Suggested Exploit: %s" % xss_info.exploit)
+ ctx.log.error("Line: %s" % xss_info.line)
+
+
+def log_SQLi_data(sqli_info: Optional[SQLiData]) -> None:
+ """ Log information about the given SQLi to mitmproxy """
+ if not sqli_info:
+ return
+ ctx.log.error("===== SQLi Found =====")
+ ctx.log.error("SQLi URL: %s" % sqli_info.url.decode('utf-8'))
+ ctx.log.error("Injection Point: %s" % sqli_info.injection_point.decode('utf-8'))
+ ctx.log.error("Regex used: %s" % sqli_info.regex.decode('utf-8'))
+ ctx.log.error("Suspected DBMS: %s" % sqli_info.dbms.decode('utf-8'))
+
+
+def get_SQLi_data(new_body: str, original_body: str, request_URL: str, injection_point: str) -> Optional[SQLiData]:
+ """ Return a SQLiDict if there is a SQLi otherwise return None
+ String String URL String -> (SQLiDict or None) """
+ # Regexes taken from Damn Small SQLi Scanner: https://github.com/stamparm/DSSS/blob/master/dsss.py#L17
+ DBMS_ERRORS = {
+ "MySQL": (r"SQL syntax.*MySQL", r"Warning.*mysql_.*", r"valid MySQL result", r"MySqlClient\."),
+ "PostgreSQL": (r"PostgreSQL.*ERROR", r"Warning.*\Wpg_.*", r"valid PostgreSQL result", r"Npgsql\."),
+ "Microsoft SQL Server": (r"Driver.* SQL[\-\_\ ]*Server", r"OLE DB.* SQL Server", r"(\W|\A)SQL Server.*Driver",
+ r"Warning.*mssql_.*", r"(\W|\A)SQL Server.*[0-9a-fA-F]{8}",
+ r"(?s)Exception.*\WSystem\.Data\.SqlClient\.", r"(?s)Exception.*\WRoadhouse\.Cms\."),
+ "Microsoft Access": (r"Microsoft Access Driver", r"JET Database Engine", r"Access Database Engine"),
+ "Oracle": (r"\bORA-[0-9][0-9][0-9][0-9]", r"Oracle error", r"Oracle.*Driver", r"Warning.*\Woci_.*", r"Warning.*\Wora_.*"),
+ "IBM DB2": (r"CLI Driver.*DB2", r"DB2 SQL error", r"\bdb2_\w+\("),
+ "SQLite": (r"SQLite/JDBCDriver", r"SQLite.Exception", r"System.Data.SQLite.SQLiteException", r"Warning.*sqlite_.*",
+ r"Warning.*SQLite3::", r"\[SQLITE_ERROR\]"),
+ "Sybase": (r"(?i)Warning.*sybase.*", r"Sybase message", r"Sybase.*Server message.*"),
+ }
+ for dbms, regexes in DBMS_ERRORS.items():
+ for regex in regexes:
+ if re.search(regex, new_body) and not re.search(regex, original_body):
+ return SQLiData(request_URL,
+ injection_point,
+ regex,
+ dbms)
+
+
+# A qc is either ' or "
+def inside_quote(qc: str, substring: bytes, text_index: int, body: bytes) -> bool:
+ """ Whether the Numberth occurence of the first string in the second
+ string is inside quotes as defined by the supplied QuoteChar """
+ substring = substring.decode('utf-8')
+ body = body.decode('utf-8')
+ num_substrings_found = 0
+ in_quote = False
+ for index, char in enumerate(body):
+ # Whether the next chunk of len(substring) chars is the substring
+ next_part_is_substring = (
+ (not (index + len(substring) > len(body))) and
+ (body[index:index + len(substring)] == substring)
+ )
+ # Whether this char is escaped with a \
+ is_not_escaped = (
+ (index - 1 < 0 or index - 1 > len(body)) or
+ (body[index - 1] != "\\")
+ )
+ if char == qc and is_not_escaped:
+ in_quote = not in_quote
+ if next_part_is_substring:
+ if num_substrings_found == text_index:
+ return in_quote
+ num_substrings_found += 1
+ return False
+
+
+def paths_to_text(html: str, str: str) -> List[str]:
+ """ Return list of Paths to a given str in the given HTML tree
+ - Note that it does a BFS """
+
+ def remove_last_occurence_of_sub_string(str: str, substr: str):
+ """ Delete the last occurence of substr from str
+ String String -> String
+ """
+ index = str.rfind(substr)
+ return str[:index] + str[index + len(substr):]
+
+ class PathHTMLParser(HTMLParser):
+ currentPath = ""
+ paths = []
+
+ def handle_starttag(self, tag, attrs):
+ self.currentPath += ("/" + tag)
+
+ def handle_endtag(self, tag):
+ self.currentPath = remove_last_occurence_of_sub_string(self.currentPath, "/" + tag)
+
+ def handle_data(self, data):
+ if str in data:
+ self.paths.append(self.currentPath)
+
+ parser = PathHTMLParser()
+ parser.feed(html)
+ return parser.paths
+
+
+def get_XSS_data(body: str, request_URL: str, injection_point: str) -> Optional[XSSData]:
+ """ Return a XSSDict if there is a XSS otherwise return None """
+ def in_script(text, index, body) -> bool:
+ """ Whether the Numberth occurence of the first string in the second
+ string is inside a script tag """
+ paths = paths_to_text(body.decode('utf-8'), text.decode("utf-8"))
+ try:
+ path = paths[index]
+ return "script" in path
+ except IndexError:
+ return False
+
+ def in_HTML(text: bytes, index: int, body: bytes) -> bool:
+ """ Whether the Numberth occurence of the first string in the second
+ string is inside the HTML but not inside a script tag or part of
+ a HTML attribute"""
+ # if there is a < then lxml will interpret that as a tag, so only search for the stuff before it
+ text = text.split(b"<")[0]
+ paths = paths_to_text(body.decode('utf-8'), text.decode("utf-8"))
+ try:
+ path = paths[index]
+ return "script" not in path
+ except IndexError:
+ return False
+
+ def inject_javascript_handler(html: str) -> bool:
+ """ Whether you can inject a Javascript:alert(0) as a link """
+ class injectJSHandlerHTMLParser(HTMLParser):
+ injectJSHandler = False
+
+ def handle_starttag(self, tag, attrs):
+ for name, value in attrs:
+ if name == "href" and value.startswith(FRONT_WALL.decode('utf-8')):
+ self.injectJSHandler = True
+
+ parser = injectJSHandlerHTMLParser()
+ parser.feed(html)
+ return parser.injectJSHandler
+ # Only convert the body to bytes if needed
+ if isinstance(body, str):
+ body = bytes(body, 'utf-8')
+ # Regex for between 24 and 72 (aka 24*3) characters encapsulated by the walls
+ regex = re.compile(b"""%s.{24,72}?%s""" % (FRONT_WALL, BACK_WALL))
+ matches = regex.findall(body)
+ for index, match in enumerate(matches):
+ # Where the string is injected into the HTML
+ in_script = in_script(match, index, body)
+ in_HTML = in_HTML(match, index, body)
+ in_tag = not in_script and not in_HTML
+ in_single_quotes = inside_quote("'", match, index, body)
+ in_double_quotes = inside_quote('"', match, index, body)
+ # Whether you can inject:
+ inject_open_angle = b"aoso" in match # close angle brackets
+ inject_single_quotes = b"s'd" in match # single quotes
+ inject_double_quotes = b'd"ao' in match # double quotes
+ inject_slash = b"sl/bsl" in match # forward slashes
+ inject_semi = b"se;sl" in match # semicolons
+ inject_equals = b"eq=" in match # equals sign
+ if in_script and inject_slash and inject_open_angle and inject_close_angle: # e.g.
+ return XSSData(request_URL,
+ injection_point,
+ '
+ return XSSData(request_URL,
+ injection_point,
+ "';alert(0);g='",
+ match.decode('utf-8'))
+ elif in_script and in_double_quotes and inject_double_quotes and inject_semi: # e.g.
+ return XSSData(request_URL,
+ injection_point,
+ '";alert(0);g="',
+ match.decode('utf-8'))
+ elif in_tag and in_single_quotes and inject_single_quotes and inject_open_angle and inject_close_angle and inject_slash:
+ # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ "'>",
+ match.decode('utf-8'))
+ elif in_tag and in_double_quotes and inject_double_quotes and inject_open_angle and inject_close_angle and inject_slash:
+ # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ '">',
+ match.decode('utf-8'))
+ elif in_tag and not in_double_quotes and not in_single_quotes and inject_open_angle and inject_close_angle and inject_slash:
+ # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ '>',
+ match.decode('utf-8'))
+ elif inject_javascript_handler(body.decode('utf-8')): # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ 'Javascript:alert(0)',
+ match.decode('utf-8'))
+ elif in_tag and in_double_quotes and inject_double_quotes and inject_equals: # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ '" onmouseover="alert(0)" t="',
+ match.decode('utf-8'))
+ elif in_tag and in_single_quotes and inject_single_quotes and inject_equals: # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ "' onmouseover='alert(0)' t='",
+ match.decode('utf-8'))
+ elif in_tag and not in_single_quotes and not in_double_quotes and inject_equals: # e.g. Test
+ return XSSData(request_URL,
+ injection_point,
+ " onmouseover=alert(0) t=",
+ match.decode('utf-8'))
+ elif in_HTML and not in_script and inject_open_angle and inject_close_angle and inject_slash: # e.g. PAYLOAD
+ return XSSData(request_URL,
+ injection_point,
+ '',
+ match.decode('utf-8'))
+ else:
+ return None
+
+
+# response is mitmproxy's entry point
+def response(flow: http.HTTPFlow) -> None:
+ cookiesDict = get_cookies(flow)
+ # Example: http://xss.guru/unclaimedScriptTag.html
+ find_unclaimed_URLs(flow.response.content, flow.request.url)
+ results = test_end_of_URL_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict)
+ log_XSS_data(results[0])
+ log_SQLi_data(results[1])
+ # Example: https://daviddworken.com/vulnerableReferer.php
+ results = test_referer_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict)
+ log_XSS_data(results[0])
+ log_SQLi_data(results[1])
+ # Example: https://daviddworken.com/vulnerableUA.php
+ results = test_user_agent_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict)
+ log_XSS_data(results[0])
+ log_SQLi_data(results[1])
+ if "?" in flow.request.url:
+ # Example: https://daviddworken.com/vulnerable.php?name=
+ results = test_query_injection(flow.response.content.decode('utf-8'), flow.request.url, cookiesDict)
+ log_XSS_data(results[0])
+ log_SQLi_data(results[1])
diff --git a/test/mitmproxy/examples/test_xss_scanner.py b/test/mitmproxy/examples/test_xss_scanner.py
new file mode 100755
index 000000000..14ee69027
--- /dev/null
+++ b/test/mitmproxy/examples/test_xss_scanner.py
@@ -0,0 +1,368 @@
+import pytest
+import requests
+from examples.complex import xss_scanner as xss
+from mitmproxy.test import tflow, tutils
+
+
+class TestXSSScanner():
+ def test_get_XSS_info(self):
+ # First type of exploit:
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"" %
+ xss.FULL_PAYLOAD,
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData('https://example.com',
+ "End of URL",
+ '" %
+ xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ '" %
+ xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b'"', b"%22").replace(b"/", b"%2F"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Second type of exploit:
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"\"", b"%22"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "';alert(0);g='",
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"\"", b"%22").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"\"", b"%22").replace(b"'", b"%22"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Third type of exploit:
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"'", b"%27"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ '";alert(0);g="',
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"'", b"%27").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b"'", b"%27").replace(b"\"", b"%22"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Fourth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD,
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "'>",
+ xss.FULL_PAYLOAD.decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"'", b"%27"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Fifth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"'", b"%27"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "\">",
+ xss.FULL_PAYLOAD.replace(b"'", b"%27").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"'", b"%27").replace(b"\"", b"%22"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Sixth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD,
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ ">",
+ xss.FULL_PAYLOAD.decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"=", b"%3D"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Seventh type of exploit: PAYLOAD
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"%s" %
+ xss.FULL_PAYLOAD,
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "",
+ xss.FULL_PAYLOAD.decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable
+ xss_info = xss.get_XSS_data(b"%s" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").replace(b"/", b"%2F"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Eighth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "Javascript:alert(0)",
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"=", b"%3D"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Ninth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ '" onmouseover="alert(0)" t="',
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b'"', b"%22"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Tenth type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ "' onmouseover='alert(0)' t='",
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"'", b"%22"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+ # Eleventh type of exploit: Test
+ # Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E"),
+ "https://example.com",
+ "End of URL")
+ expected_xss_info = xss.XSSData("https://example.com",
+ "End of URL",
+ " onmouseover=alert(0) t=",
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E").decode('utf-8'))
+ assert xss_info == expected_xss_info
+ # Non-Exploitable:
+ xss_info = xss.get_XSS_data(b"Test" %
+ xss.FULL_PAYLOAD.replace(b"<", b"%3C").replace(b">", b"%3E")
+ .replace(b"=", b"%3D"),
+ "https://example.com",
+ "End of URL")
+ assert xss_info is None
+
+ def test_get_SQLi_data(self):
+ sqli_data = xss.get_SQLi_data("SQL syntax MySQL",
+ "",
+ "https://example.com",
+ "End of URL")
+ expected_sqli_data = xss.SQLiData("https://example.com",
+ "End of URL",
+ "SQL syntax.*MySQL",
+ "MySQL")
+ assert sqli_data == expected_sqli_data
+ sqli_data = xss.get_SQLi_data("SQL syntax MySQL",
+ "SQL syntax MySQL",
+ "https://example.com",
+ "End of URL")
+ assert sqli_data is None
+
+ def test_inside_quote(self):
+ assert not xss.inside_quote("'", b"no", 0, b"no")
+ assert xss.inside_quote("'", b"yes", 0, b"'yes'")
+ assert xss.inside_quote("'", b"yes", 1, b"'yes'otherJunk'yes'more")
+ assert not xss.inside_quote("'", b"longStringNotInIt", 1, b"short")
+
+ def test_paths_to_text(self):
+ text = xss.paths_to_text("""STRING
+
+ """, "STRING")
+ expected_text = ["/html/head/h1", "/html/script"]
+ assert text == expected_text
+ assert xss.paths_to_text("""""", "STRING") == []
+
+ def mocked_requests_vuln(*args, headers=None, cookies=None):
+ class MockResponse:
+ def __init__(self, html, headers=None, cookies=None):
+ self.text = html
+ return MockResponse("%s" % xss.FULL_PAYLOAD)
+
+ def mocked_requests_invuln(*args, headers=None, cookies=None):
+ class MockResponse:
+ def __init__(self, html, headers=None, cookies=None):
+ self.text = html
+ return MockResponse("")
+
+ def test_test_end_of_url_injection(self, monkeypatch):
+ monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln)
+ xss_info = xss.test_end_of_URL_injection("", "https://example.com/index.html", {})[0]
+ expected_xss_info = xss.XSSData('https://example.com/index.html/1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd',
+ 'End of URL',
+ '',
+ '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd')
+ sqli_info = xss.test_end_of_URL_injection("", "https://example.com/", {})[1]
+ assert xss_info == expected_xss_info
+ assert sqli_info is None
+
+ def test_test_referer_injection(self, monkeypatch):
+ monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln)
+ xss_info = xss.test_referer_injection("", "https://example.com/", {})[0]
+ expected_xss_info = xss.XSSData('https://example.com/',
+ 'Referer',
+ '',
+ '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd')
+ sqli_info = xss.test_referer_injection("", "https://example.com/", {})[1]
+ assert xss_info == expected_xss_info
+ assert sqli_info is None
+
+ def test_test_user_agent_injection(self, monkeypatch):
+ monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln)
+ xss_info = xss.test_user_agent_injection("", "https://example.com/", {})[0]
+ expected_xss_info = xss.XSSData('https://example.com/',
+ 'User Agent',
+ '',
+ '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd')
+ sqli_info = xss.test_user_agent_injection("", "https://example.com/", {})[1]
+ assert xss_info == expected_xss_info
+ assert sqli_info is None
+
+ def test_test_query_injection(self, monkeypatch):
+ monkeypatch.setattr(requests, 'get', self.mocked_requests_vuln)
+ xss_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[0]
+ expected_xss_info = xss.XSSData('https://example.com/vuln.php?cmd=1029zxcs\'d"aoso[sb]po(pc)se;sl/bsl\\eq=3847asd',
+ 'Query',
+ '',
+ '1029zxcs\\\'d"aoso[sb]po(pc)se;sl/bsl\\\\eq=3847asd')
+ sqli_info = xss.test_query_injection("", "https://example.com/vuln.php?cmd=ls", {})[1]
+ assert xss_info == expected_xss_info
+ assert sqli_info is None
+
+ @pytest.fixture
+ def logger(self):
+ class Logger():
+ def __init__(self):
+ self.args = []
+
+ def error(self, str):
+ self.args.append(str)
+ return Logger()
+
+ def test_find_unclaimed_URLs(self, monkeypatch, logger):
+ logger.args = []
+ monkeypatch.setattr("mitmproxy.ctx.log", logger)
+ xss.find_unclaimed_URLs("",
+ "https://example.com")
+ assert logger.args == []
+ xss.find_unclaimed_URLs("",
+ "https://example.com")
+ assert logger.args[0] == 'XSS found in https://example.com due to unclaimed URL "http://unclaimedDomainName.com" in script tag.'
+
+ def test_log_XSS_data(self, monkeypatch, logger):
+ logger.args = []
+ monkeypatch.setattr("mitmproxy.ctx.log", logger)
+ xss.log_XSS_data(None)
+ assert logger.args == []
+ # self, url: str, injection_point: str, exploit: str, line: str
+ xss.log_XSS_data(xss.XSSData('https://example.com',
+ 'Location',
+ 'String',
+ 'Line of HTML'))
+ assert logger.args[0] == '===== XSS Found ===='
+ assert logger.args[1] == 'XSS URL: https://example.com'
+ assert logger.args[2] == 'Injection Point: Location'
+ assert logger.args[3] == 'Suggested Exploit: String'
+ assert logger.args[4] == 'Line: Line of HTML'
+
+ def test_log_SQLi_data(self, monkeypatch, logger):
+ logger.args = []
+ monkeypatch.setattr("mitmproxy.ctx.log", logger)
+ xss.log_SQLi_data(None)
+ assert logger.args == []
+ xss.log_SQLi_data(xss.SQLiData(b'https://example.com',
+ b'Location',
+ b'Oracle.*Driver',
+ b'Oracle'))
+ assert logger.args[0] == '===== SQLi Found ====='
+ assert logger.args[1] == 'SQLi URL: https://example.com'
+ assert logger.args[2] == 'Injection Point: Location'
+ assert logger.args[3] == 'Regex used: Oracle.*Driver'
+
+ def test_get_cookies(self):
+ mocked_req = tutils.treq()
+ mocked_req.cookies = [("cookieName2", "cookieValue2")]
+ mocked_flow = tflow.tflow(req=mocked_req)
+ # It only uses the request cookies
+ assert xss.get_cookies(mocked_flow) == {"cookieName2": "cookieValue2"}
+
+ def test_response(self, monkeypatch, logger):
+ logger.args = []
+ monkeypatch.setattr("mitmproxy.ctx.log", logger)
+ monkeypatch.setattr(requests, 'get', self.mocked_requests_invuln)
+ mocked_flow = tflow.tflow(req=tutils.treq(path=b"index.html?q=1"), resp=tutils.tresp(content=b''))
+ xss.response(mocked_flow)
+ assert logger.args == []
+
+ def test_data_equals(self):
+ xssData = xss.XSSData("a", "b", "c", "d")
+ sqliData = xss.SQLiData("a", "b", "c", "d")
+ assert xssData == xssData
+ assert sqliData == sqliData