mirror of
https://github.com/Grasscutters/mitmproxy.git
synced 2024-11-26 18:18:25 +00:00
131 lines
5.1 KiB
Python
131 lines
5.1 KiB
Python
|
import abc
|
||
|
import logging
|
||
|
import random
|
||
|
import string
|
||
|
import time
|
||
|
from typing import Dict, List, cast, Any
|
||
|
|
||
|
import mitmproxy.http
|
||
|
from mitmproxy import flowfilter
|
||
|
from mitmproxy import master
|
||
|
from mitmproxy.script import concurrent
|
||
|
from selenium import webdriver
|
||
|
|
||
|
logger = logging.getLogger(__name__)
|
||
|
|
||
|
cookie_key_name = {
|
||
|
"path": "Path",
|
||
|
"expires": "Expires",
|
||
|
"domain": "Domain",
|
||
|
"is_http_only": "HttpOnly",
|
||
|
"is_secure": "Secure"
|
||
|
}
|
||
|
|
||
|
|
||
|
def randomString(string_length=10):
|
||
|
"""Generate a random string of fixed length """
|
||
|
letters = string.ascii_lowercase
|
||
|
return ''.join(random.choice(letters) for i in range(string_length))
|
||
|
|
||
|
|
||
|
class AuthorizationOracle(abc.ABC):
|
||
|
"""Abstract class for an authorization oracle which decides if a given request or response is authenticated."""
|
||
|
@abc.abstractmethod
|
||
|
def is_unauthorized_request(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
||
|
pass
|
||
|
|
||
|
@abc.abstractmethod
|
||
|
def is_unauthorized_response(self, flow: mitmproxy.http.HTTPFlow) -> bool:
|
||
|
pass
|
||
|
|
||
|
|
||
|
class SeleniumAddon:
|
||
|
""" This Addon can be used in combination with web application scanners in order to help them to authenticate
|
||
|
against a web application.
|
||
|
|
||
|
Since the authentication is highly dependant on the web application, this add-on includes the abstract method
|
||
|
*login*. In order to use the add-on, a class for the web application inheriting from SeleniumAddon needs to be
|
||
|
created. This class needs to include the concrete selenium actions necessary to authenticate against the web
|
||
|
application. In addition, an authentication oracle which inherits from AuthorizationOracle should be created.
|
||
|
"""
|
||
|
def __init__(self, fltr: str, domain: str,
|
||
|
auth_oracle: AuthorizationOracle):
|
||
|
self.filter = flowfilter.parse(fltr)
|
||
|
self.auth_oracle = auth_oracle
|
||
|
self.domain = domain
|
||
|
self.browser = None
|
||
|
self.set_cookies = False
|
||
|
|
||
|
options = webdriver.FirefoxOptions()
|
||
|
options.headless = True
|
||
|
|
||
|
profile = webdriver.FirefoxProfile()
|
||
|
profile.set_preference('network.proxy.type', 0)
|
||
|
self.browser = webdriver.Firefox(firefox_profile=profile,
|
||
|
options=options)
|
||
|
self.cookies: List[Dict[str, str]] = []
|
||
|
|
||
|
def _login(self, flow):
|
||
|
self.cookies = self.login(flow)
|
||
|
self.browser.get("about:blank")
|
||
|
self._set_request_cookies(flow)
|
||
|
self.set_cookies = True
|
||
|
|
||
|
def request(self, flow: mitmproxy.http.HTTPFlow):
|
||
|
if flow.request.is_replay:
|
||
|
logger.warning("Caught replayed request: " + str(flow))
|
||
|
if (not self.filter or self.filter(flow)) and self.auth_oracle.is_unauthorized_request(flow):
|
||
|
logger.debug("unauthorized request detected, perform login")
|
||
|
self._login(flow)
|
||
|
|
||
|
# has to be concurrent because replay.client is blocking and replayed flows
|
||
|
# will also call response
|
||
|
@concurrent
|
||
|
def response(self, flow: mitmproxy.http.HTTPFlow):
|
||
|
if flow.response and (self.filter is None or self.filter(flow)):
|
||
|
if self.auth_oracle.is_unauthorized_response(flow):
|
||
|
self._login(flow)
|
||
|
new_flow = flow.copy()
|
||
|
if master and hasattr(master, 'commands'):
|
||
|
# cast necessary for mypy
|
||
|
cast(Any, master).commands.call("replay.client", [new_flow])
|
||
|
count = 0
|
||
|
while new_flow.response is None and count < 10:
|
||
|
logger.error("waiting since " + str(count) + " ...")
|
||
|
count = count + 1
|
||
|
time.sleep(1)
|
||
|
if new_flow.response:
|
||
|
flow.response = new_flow.response
|
||
|
else:
|
||
|
logger.warning("Could not call 'replay.client' command since master was not initialized yet.")
|
||
|
|
||
|
if self.set_cookies and flow.response:
|
||
|
logger.debug("set set-cookie header for response")
|
||
|
self._set_set_cookie_headers(flow)
|
||
|
self.set_cookies = False
|
||
|
|
||
|
def done(self):
|
||
|
self.browser.close()
|
||
|
|
||
|
def _set_set_cookie_headers(self, flow: mitmproxy.http.HTTPFlow):
|
||
|
if flow.response and self.cookies:
|
||
|
for cookie in self.cookies:
|
||
|
parts = [f"{cookie['name']}={cookie['value']}"]
|
||
|
for k, v in cookie_key_name.items():
|
||
|
if k in cookie and isinstance(cookie[k], str):
|
||
|
parts.append(f"{v}={cookie[k]}")
|
||
|
elif k in cookie and isinstance(cookie[k], bool) and cookie[k]:
|
||
|
parts.append(cookie[k])
|
||
|
encoded_c = "; ".join(parts)
|
||
|
flow.response.headers["set-cookie"] = encoded_c
|
||
|
|
||
|
def _set_request_cookies(self, flow: mitmproxy.http.HTTPFlow):
|
||
|
if self.cookies:
|
||
|
cookies = "; ".join(
|
||
|
map(lambda c: f"{c['name']}={c['value']}", self.cookies))
|
||
|
flow.request.headers["cookie"] = cookies
|
||
|
|
||
|
@abc.abstractmethod
|
||
|
def login(self, flow: mitmproxy.http.HTTPFlow) -> List[Dict[str, str]]:
|
||
|
pass
|