diff --git a/cogs/check_su_platform_authorisation.py b/cogs/check_su_platform_authorisation.py index e0b8a3cc..b45fbe4c 100644 --- a/cogs/check_su_platform_authorisation.py +++ b/cogs/check_su_platform_authorisation.py @@ -4,19 +4,18 @@ from enum import Enum from typing import TYPE_CHECKING, override -import aiohttp -import bs4 import discord from discord.ext import tasks from config import settings -from utils import GLOBAL_SSL_CONTEXT, CommandChecks, TeXBotBaseCog +from utils import CommandChecks, TeXBotBaseCog from utils.error_capture_decorators import ( capture_guild_does_not_exist_error, ) +from utils.msl import get_su_platform_access_cookie_status, get_su_platform_organisations if TYPE_CHECKING: - from collections.abc import Iterable, Mapping, Sequence + from collections.abc import Sequence from collections.abc import Set as AbstractSet from logging import Logger from typing import Final @@ -31,21 +30,6 @@ logger: "Final[Logger]" = logging.getLogger("TeX-Bot") -REQUEST_HEADERS: "Final[Mapping[str, str]]" = { - "Cache-Control": "no-cache", - "Pragma": "no-cache", - "Expires": "0", -} - -REQUEST_COOKIES: "Final[Mapping[str, str]]" = { - ".AspNet.SharedCookie": settings["SU_PLATFORM_ACCESS_COOKIE"] -} - -SU_PLATFORM_PROFILE_URL: "Final[str]" = "https://guildofstudents.com/profile" -SU_PLATFORM_ORGANISATION_URL: "Final[str]" = ( - "https://www.guildofstudents.com/organisation/admin" -) - class SUPlatformAccessCookieStatus(Enum): """Enum class defining the status of the SU Platform Access Cookie.""" @@ -73,114 +57,7 @@ class SUPlatformAccessCookieStatus(Enum): ) -class CheckSUPlatformAuthorisationBaseCog(TeXBotBaseCog): - """Cog class that defines the base functionality for cookie authorisation checks.""" - - async def _fetch_url_content_with_session(self, url: str) -> str: - """Fetch the HTTP content at the given URL, using a shared aiohttp session.""" - async with ( - aiohttp.ClientSession( - headers=REQUEST_HEADERS, cookies=REQUEST_COOKIES - ) as http_session, - http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as http_response, - ): - return await http_response.text() - - async def get_su_platform_access_cookie_status(self) -> SUPlatformAccessCookieStatus: - """Retrieve the current validity status of the SU platform access cookie.""" - response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( - await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" - ) - page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") - if not page_title or "Login" in str(page_title): - logger.warning("Token is invalid or expired.") - return SUPlatformAccessCookieStatus.INVALID - - organisation_admin_url: str = ( - f"{SU_PLATFORM_ORGANISATION_URL}/{settings['ORGANISATION_ID']}" - ) - response_html: str = await self._fetch_url_content_with_session(organisation_admin_url) - - if "admin tools" in response_html.lower(): - return SUPlatformAccessCookieStatus.AUTHORISED - - if "You do not have any permissions for this organisation" in response_html.lower(): - return SUPlatformAccessCookieStatus.VALID - - logger.warning( - "Unexpected response when checking SU platform access cookie authorisation." - ) - return SUPlatformAccessCookieStatus.INVALID - - async def get_su_platform_organisations(self) -> "Iterable[str]": - """Retrieve the MSL organisations the current SU platform cookie has access to.""" - response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( - await self._fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" - ) - - page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") - - if not page_title: - logger.warning( - "Profile page returned no content when checking " - "SU platform access cookie's authorisation." - ) - return () - - if "Login" in str(page_title): - logger.warning( - "Authentication redirected to login page. " - "SU platform access cookie is invalid or expired." - ) - return () - - profile_section_html: bs4.Tag | bs4.NavigableString | None = response_object.find( - "div", {"id": "profile_main"} - ) - - if profile_section_html is None: - logger.warning( - "Couldn't find the profile section of the user " - "when scraping the SU platform's website HTML." - ) - logger.debug("Retrieved HTML: %s", response_object.text) - return () - - user_name: bs4.Tag | bs4.NavigableString | int | None = profile_section_html.find("h1") - - if not isinstance(user_name, bs4.Tag): - logger.warning( - "Found user profile on the SU platform but couldn't find their name." - ) - logger.debug("Retrieved HTML: %s", response_object.text) - return () - - parsed_html: bs4.Tag | bs4.NavigableString | None = response_object.find( - "ul", {"id": "ulOrgs"} - ) - - if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): - NO_ADMIN_TABLE_MESSAGE: Final[str] = ( - f"Failed to retrieve the admin table for user: {user_name.string}. " - "Please check you have used the correct SU platform access token!" - ) - logger.warning(NO_ADMIN_TABLE_MESSAGE) - return () - - organisations: Iterable[str] = [ - list_item.get_text(strip=True) for list_item in parsed_html.find_all("li") - ] - - logger.debug( - "SU platform access cookie has admin authorisation to: %s as user %s", - organisations, - user_name.text, - ) - - return organisations - - -class CheckSUPlatformAuthorisationCommandCog(CheckSUPlatformAuthorisationBaseCog): +class CheckSUPlatformAuthorisationCommandCog(TeXBotBaseCog): """Cog class that defines the "/check-su-platform-authorisation" command.""" @discord.slash_command( @@ -201,7 +78,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext") async with ctx.typing(): su_platform_access_cookie_organisations: AbstractSet[str] = set( - await self.get_su_platform_organisations() + await get_su_platform_organisations() ) await ctx.followup.send( @@ -224,7 +101,7 @@ async def check_su_platform_authorisation(self, ctx: "TeXBotApplicationContext") ) -class CheckSUPlatformAuthorisationTaskCog(CheckSUPlatformAuthorisationBaseCog): +class CheckSUPlatformAuthorisationTaskCog(TeXBotBaseCog): """Cog class defining a repeated task for checking SU platform access cookie.""" @override @@ -256,7 +133,7 @@ async def su_platform_access_cookie_check_task(self) -> None: logger.debug("Running SU platform access cookie check task...") su_platform_access_cookie_status: tuple[int, str] = ( - await self.get_su_platform_access_cookie_status() + await get_su_platform_access_cookie_status() ).value logger.log( diff --git a/utils/msl/__init__.py b/utils/msl/__init__.py index 99e985e1..f21f51fd 100644 --- a/utils/msl/__init__.py +++ b/utils/msl/__init__.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING +from .authorisation import get_su_platform_access_cookie_status, get_su_platform_organisations from .memberships import ( fetch_community_group_members_count, fetch_community_group_members_list, @@ -12,7 +13,10 @@ from collections.abc import Sequence __all__: "Sequence[str]" = ( + "GLOBAL_SSL_CONTEXT", "fetch_community_group_members_count", "fetch_community_group_members_list", + "get_su_platform_access_cookie_status", + "get_su_platform_organisations", "is_id_a_community_group_member", ) diff --git a/utils/msl/authorisation.py b/utils/msl/authorisation.py new file mode 100644 index 00000000..eeb63b46 --- /dev/null +++ b/utils/msl/authorisation.py @@ -0,0 +1,135 @@ +"""Module for authorisation checks.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +import bs4 + +from cogs.check_su_platform_authorisation import SUPlatformAccessCookieStatus +from config import settings +from utils import GLOBAL_SSL_CONTEXT + +from .core import BASE_COOKIES, BASE_HEADERS + +if TYPE_CHECKING: + from collections.abc import Iterable, Sequence + from logging import Logger + from typing import Final + + +__all__: "Sequence[str]" = ( + "get_su_platform_access_cookie_status", + "get_su_platform_organisations", +) + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +SU_PLATFORM_PROFILE_URL: "Final[str]" = "https://guildofstudents.com/profile" +SU_PLATFORM_ORGANISATION_URL: "Final[str]" = ( + "https://www.guildofstudents.com/organisation/admin" +) + + +async def _fetch_url_content_with_session(url: str) -> str: + """Fetch the HTTP content at the given URL, using a shared aiohttp session.""" + async with ( + aiohttp.ClientSession(headers=BASE_HEADERS, cookies=BASE_COOKIES) as http_session, + http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as http_response, + ): + return await http_response.text() + + +async def get_su_platform_access_cookie_status() -> SUPlatformAccessCookieStatus: + """Retrieve the current validity status of the SU platform access cookie.""" + response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( + await _fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" + ) + page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") + if not page_title or "Login" in str(page_title): + logger.debug("Token is invalid or expired.") + return SUPlatformAccessCookieStatus.INVALID + + organisation_admin_url: str = ( + f"{SU_PLATFORM_ORGANISATION_URL}/{settings['ORGANISATION_ID']}" + ) + response_html: str = await _fetch_url_content_with_session(organisation_admin_url) + + if "admin tools" in response_html.lower(): + return SUPlatformAccessCookieStatus.AUTHORISED + + if "You do not have any permissions for this organisation" in response_html.lower(): + return SUPlatformAccessCookieStatus.VALID + + logger.warning( + "Unexpected response when checking SU platform access cookie authorisation." + ) + return SUPlatformAccessCookieStatus.INVALID + + +async def get_su_platform_organisations() -> "Iterable[str]": + """Retrieve the MSL organisations the current SU platform cookie has access to.""" + response_object: bs4.BeautifulSoup = bs4.BeautifulSoup( + await _fetch_url_content_with_session(SU_PLATFORM_PROFILE_URL), "html.parser" + ) + + page_title: bs4.Tag | bs4.NavigableString | None = response_object.find("title") + + if not page_title: + logger.warning( + "Profile page returned no content when checking " + "SU platform access cookie's authorisation." + ) + return () + + if "Login" in str(page_title): + logger.warning( + "Authentication redirected to login page. " + "SU platform access cookie is invalid or expired." + ) + return () + + profile_section_html: bs4.Tag | bs4.NavigableString | None = response_object.find( + "div", {"id": "profile_main"} + ) + + if profile_section_html is None: + logger.warning( + "Couldn't find the profile section of the user " + "when scraping the SU platform's website HTML." + ) + logger.debug("Retrieved HTML: %s", response_object.text) + return () + + user_name: bs4.Tag | bs4.NavigableString | int | None = profile_section_html.find("h1") + + if not isinstance(user_name, bs4.Tag): + logger.warning("Found user profile on the SU platform but couldn't find their name.") + logger.debug("Retrieved HTML: %s", response_object.text) + return () + + parsed_html: bs4.Tag | bs4.NavigableString | None = response_object.find( + "ul", {"id": "ulOrgs"} + ) + + if parsed_html is None or isinstance(parsed_html, bs4.NavigableString): + NO_ADMIN_TABLE_MESSAGE: Final[str] = ( + f"Failed to retrieve the admin table for user: {user_name.string}. " + "Please check you have used the correct SU platform access token!" + ) + logger.warning(NO_ADMIN_TABLE_MESSAGE) + return () + + organisations: Iterable[str] = [ + list_item.get_text(strip=True) for list_item in parsed_html.find_all("li") + ] + + logger.debug( + "SU platform access cookie has admin authorisation to: %s as user %s", + organisations, + user_name.text, + ) + + return organisations diff --git a/utils/msl/core.py b/utils/msl/core.py new file mode 100644 index 00000000..32a90baa --- /dev/null +++ b/utils/msl/core.py @@ -0,0 +1,65 @@ +"""Functions to enable interaction with MSL based SU websites.""" + +import logging +from typing import TYPE_CHECKING + +import aiohttp +from bs4 import BeautifulSoup + +from config import settings +from utils import GLOBAL_SSL_CONTEXT + +if TYPE_CHECKING: + from collections.abc import Mapping, Sequence + from http.cookies import Morsel + from logging import Logger + from typing import Final + +__all__: "Sequence[str]" = () + + +logger: "Final[Logger]" = logging.getLogger("TeX-Bot") + + +BASE_HEADERS: "Final[Mapping[str, str]]" = { + "Cache-Control": "no-cache", + "Pragma": "no-cache", + "Expires": "0", +} + +BASE_COOKIES: "Final[Mapping[str, str]]" = { + ".ASPXAUTH": settings["SU_PLATFORM_ACCESS_COOKIE"], +} + +ORGANISATION_ID: "Final[str]" = settings["ORGANISATION_ID"] + +ORGANISATION_ADMIN_URL: "Final[str]" = ( + f"https://www.guildofstudents.com/organisation/admin/{ORGANISATION_ID}/" +) + + +async def get_msl_context(url: str) -> tuple[dict[str, str], dict[str, str]]: + """Get the required context headers, data and cookies to make a request to MSL.""" + http_session: aiohttp.ClientSession = aiohttp.ClientSession( + headers=BASE_HEADERS, + cookies=BASE_COOKIES, + ) + data_fields: dict[str, str] = {} + cookies: dict[str, str] = {} + async with http_session, http_session.get(url=url, ssl=GLOBAL_SSL_CONTEXT) as field_data: + data_response = BeautifulSoup( + markup=await field_data.text(), + features="html.parser", + ) + + for field in data_response.find_all(name="input"): + if field.get("name") and field.get("value"): + data_fields[field.get("name")] = field.get("value") + + for cookie in field_data.cookies: + cookie_morsel: Morsel[str] | None = field_data.cookies.get(cookie) + if cookie_morsel is not None: + cookies[cookie] = cookie_morsel.value + cookies[".ASPXAUTH"] = settings["MEMBERS_LIST_AUTH_SESSION_COOKIE"] + + return data_fields, cookies