diff --git a/README.md b/README.md index 2c6193c..f96cc6c 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,25 @@ -# Celcat Calendar Scraper +# Celcat Calendar Scraper 📆 An asynchronous Python library for scraping Celcat calendar systems. -## Installation +## Installation 🚀 ```sh pip install celcat-scraper ``` -## Usage +## Features 🌟 + +* Event attributes filtering 🔎 +* Async/await support for better performance 🔀 +* Rate limiting with adaptive backoff âŗ +* Optional caching support 💾 +* Optional reusable aiohttp session â™ģī¸ +* Automatic session management đŸĒ +* Batch processing of events đŸ“Ļ +* Error handling and retries 🚨 + +## Usage âš™ī¸ Basic example of retrieving calendar events: @@ -23,21 +34,22 @@ async def main(): url="https://university.com/calendar", username="your_username", password="your_password", - include_holidays=True + include_holidays=True, ) # Create scraper instance and get events async with CelcatScraperAsync(config) as scraper: - start_date = date.today() end_date = start_date + timedelta(days=30) - + # Recommended to store events locally and reduce the amout of requests - file_path = 'store.json' + file_path = "store.json" events = scraper.deserialize_events(file_path) - - events = await scraper.get_calendar_events(start_date, end_date, previous_events=events) - + + events = await scraper.get_calendar_events( + start_date, end_date, previous_events=events + ) + for event in events: print(f"Event {event['id']}") print(f"Course: {event['category']} - {event['course']}") @@ -45,7 +57,7 @@ async def main(): print(f"Location: {', '.join(event['rooms'])} at {', '.join(event['sites'])} - {event['department']}") print(f"Professors: {', '.join(event['professors'])}") print("---") - + # Save events for a future refresh scraper.serialize_events(events, file_path) @@ -53,12 +65,79 @@ if __name__ == "__main__": asyncio.run(main()) ``` -## Features +## Filtering 🔍 + +Celcat Calendar data is often messy, and needs to be processed before it can be used. +For example, the same course may have several different names in different events. +Filtering allows these attributes to be standardized. + +### Usage âš™ī¸ + +> â„šī¸ **Info**: Each filter argument is optional. When course_strip_redundant is enabled, using remembered_strips is recommended. -* Async/await support for better performance -* Rate limiting with adaptive backoff -* Optional caching support -* Optional reusable aiohttp session -* Automatic session management -* Batch processing of events -* Error handling and retries +> âš ī¸ **Warning**: Disabling filters will require you to reset your previous events and refetch to undo changes. + +```python +import asyncio +from datetime import date, timedelta +import json +from celcat_scraper import CelcatFilterConfig, FilterType, CelcatConfig, CelcatScraperAsync + +async def main(): + # Load remembered_strips from a file + remembered_strips = [] + try: + with open("remembered_strips.json", "r") as f: + remembered_strips = json.load(f) + except (FileNotFoundError, json.JSONDecodeError): + remembered_strips = [] + + # Create a list of manual course replacements + course_replacements = {"English - S2": "English", "Mathematics": "Maths"} + + # Configure a filter + filter_config = CelcatFilterConfig( + filters = { + FilterType.COURSE_TITLE, + FilterType.COURSE_STRIP_MODULES, + FilterType.COURSE_STRIP_CATEGORY, + FilterType.COURSE_STRIP_PUNCTUATION, + FilterType.COURSE_GROUP_SIMILAR, + FilterType.COURSE_STRIP_REDUNDANT, + FilterType.PROFESSORS_TITLE, + FilterType.ROOMS_TITLE, + FilterType.ROOMS_STRIP_AFTER_NUMBER, + FilterType.SITES_TITLE, + FilterType.SITES_REMOVE_DUPLICATES, + } + course_remembered_strips=remembered_strips, + course_replacements=course_replacements, + ) + + config = CelcatConfig( + url="https://university.com/calendar", + username="your_username", + password="your_password", + include_holidays=True, + # Pass the filter as an argument + filter_config=filter_config, + ) + + async with CelcatScraperAsync(config) as scraper: + start_date = date.today() + end_date = start_date + timedelta(days=30) + + events = scraper.deserialize_events("store.json") + events = await scraper.get_calendar_events( + start_date, end_date, previous_events=events + ) + + scraper.serialize_events(events, file_path) + + # Save the updated remembered_strips back to file + with open("remembered_strips.json", "w") as f: + json.dump(scraper.filter_config.course_remembered_strips, f) + +if __name__ == "__main__": + asyncio.run(main()) +``` diff --git a/celcat_scraper/__init__.py b/celcat_scraper/__init__.py index 7c06c2a..0fc65d4 100644 --- a/celcat_scraper/__init__.py +++ b/celcat_scraper/__init__.py @@ -2,14 +2,17 @@ This package provides a complete interface for interacting with Celcat Calendar. """ -from .config import CelcatConfig, CelcatConstants + +from .config import CelcatConfig, CelcatFilterConfig, CelcatConstants, FilterType from .exceptions import CelcatError, CelcatCannotConnectError, CelcatInvalidAuthError from .scraper import CelcatScraperAsync from .types import EventData __all__ = [ "CelcatConfig", + "CelcatFilterConfig", "CelcatConstants", + "FilterType", "CelcatScraperAsync", "EventData", "CelcatError", diff --git a/celcat_scraper/api.py b/celcat_scraper/api.py index f88b31c..69d0ade 100644 --- a/celcat_scraper/api.py +++ b/celcat_scraper/api.py @@ -16,26 +16,31 @@ _LOGGER = logging.getLogger(__name__) + class CelcatAPI: """Class for interacting with Celcat Calendar API.""" - def __init__(self): + def __init__(self, config: CelcatConfig): """Initialize the Celcat API client.""" - self.rate_limiter = RateLimiter(1/CelcatConfig.rate_limit) + self.rate_limiter = RateLimiter(config.rate_limit) self.semaphore = asyncio.Semaphore(CelcatConstants.CONCURRENT_REQUESTS) self.timeout = CelcatConstants.TIMEOUT - - async def validate_response(self, response: ClientResponse, expected_type: str = None) -> Any: + + async def validate_response( + self, response: ClientResponse, expected_type: str = None + ) -> Any: """Validate server response and return appropriate data type.""" if response.status != 200: - error_text = await response.text(encoding='latin1') + error_text = await response.text(encoding="latin1") raise CelcatCannotConnectError( f"Server returned status {response.status}: {error_text[:200]}" ) if expected_type == "json": if "application/json" not in response.headers.get("Content-Type", ""): - raise CelcatCannotConnectError("Expected JSON response but got different content type") + raise CelcatCannotConnectError( + "Expected JSON response but got different content type" + ) return await response.json() return await response.text() @@ -50,7 +55,9 @@ async def handle_error_response(self, response: ClientResponse) -> None: elif response.status == 429: retry_after = int(response.headers.get("Retry-After", 30)) self.rate_limiter.increase_backoff() - raise CelcatCannotConnectError(f"Rate limited. Retry after {retry_after} seconds") + raise CelcatCannotConnectError( + f"Rate limited. Retry after {retry_after} seconds" + ) else: raise CelcatCannotConnectError(f"HTTP {response.status}: {error_msg}") @@ -60,7 +67,7 @@ async def get_calendar_raw_data( url: str, federation_ids: str, start_date: date, - end_date: date + end_date: date, ) -> List[Dict[str, Any]]: """Fetch raw calendar data for given time period.""" _LOGGER.info("Getting calendar raw data") @@ -73,38 +80,25 @@ async def get_calendar_raw_data( "end": end_date.strftime("%Y-%m-%d"), "resType": "104", "calView": "month", - "federationIds[]": federation_ids + "federationIds[]": federation_ids, } url_calendar_data = url + "/Home/GetCalendarData" - + return await self.fetch_with_retry( - session, - "POST", - "json", - url_calendar_data, - data=calendar_data + session, "POST", "json", url_calendar_data, data=calendar_data ) async def get_side_bar_event_raw_data( - self, - session: ClientSession, - url: str, - event_id: str + self, session: ClientSession, url: str, event_id: str ) -> dict: """Fetch detailed event data by ID.""" - sidebar_data = { - "eventid": event_id - } + sidebar_data = {"eventid": event_id} url_sidebar_data = url + "/Home/GetSideBarEvent" - + return await self.fetch_with_retry( - session, - "POST", - "json", - url_sidebar_data, - data=sidebar_data + session, "POST", "json", url_sidebar_data, data=sidebar_data ) async def fetch_with_retry( @@ -113,7 +107,7 @@ async def fetch_with_retry( method: str, expected_type: str, url: str, - **kwargs + **kwargs, ) -> Any: """Make HTTP requests with retry logic.""" await self.rate_limiter.acquire() @@ -126,12 +120,14 @@ async def fetch_with_retry( async with session.request(method, url, **kwargs) as response: if response.status == 200: content_type = response.headers.get("Content-Type", "") - + if expected_type == "json": if "application/json" in content_type: data = await response.json() else: - raise CelcatCannotConnectError(f"Expected JSON response but got different content type: {content_type}") + raise CelcatCannotConnectError( + f"Expected JSON response but got different content type: {content_type}" + ) else: data = await response.text() @@ -143,5 +139,7 @@ async def fetch_with_retry( except ClientError as exc: self.rate_limiter.increase_backoff() if attempt == CelcatConstants.MAX_RETRIES - 1: - raise CelcatCannotConnectError(f"Failed after {CelcatConstants.MAX_RETRIES} attempts") from exc - await asyncio.sleep(min(2 ** attempt, 10)) + raise CelcatCannotConnectError( + f"Failed after {CelcatConstants.MAX_RETRIES} attempts" + ) from exc + await asyncio.sleep(min(2**attempt, 10)) diff --git a/celcat_scraper/auth.py b/celcat_scraper/auth.py index 61c2940..2ebbf0e 100644 --- a/celcat_scraper/auth.py +++ b/celcat_scraper/auth.py @@ -14,11 +14,9 @@ _LOGGER = logging.getLogger(__name__) + async def authenticate( - session: ClientSession, - url: str, - username: str, - password: str + session: ClientSession, url: str, username: str, password: str ) -> Tuple[bool, Optional[str]]: """Authenticate to Celcat. @@ -57,13 +55,13 @@ async def authenticate( login_data = { "Name": username, "Password": password, - "__RequestVerificationToken": token_element["value"] + "__RequestVerificationToken": token_element["value"], } async with session.post( f"{url}/LdapLogin/Logon", data=login_data, - headers={"Content-Type": "application/x-www-form-urlencoded"} + headers={"Content-Type": "application/x-www-form-urlencoded"}, ) as response: if response.status != 200: error_text = await response.text(encoding="latin1") @@ -80,7 +78,9 @@ async def authenticate( raise CelcatCannotConnectError("Failed to connect to Celcat service") from exc -def _process_login_response(response_url, page_content: str) -> Tuple[bool, Optional[str]]: +def _process_login_response( + response_url, page_content: str +) -> Tuple[bool, Optional[str]]: """Process login response and extract federation IDs. Returns: @@ -96,20 +96,29 @@ def _process_login_response(response_url, page_content: str) -> Tuple[bool, Opti if login_button_state == "Log Out": federation_ids = next( - (param.split("=")[1] for param in str(response_url).split("&") - if param.startswith("FederationIds=")), - None + ( + param.split("=")[1] + for param in str(response_url).split("&") + if param.startswith("FederationIds=") + ), + None, ) if federation_ids is None: - _LOGGER.debug("FederationIds could not be retrieved. Trying to extract from page") + _LOGGER.debug( + "FederationIds could not be retrieved. Trying to extract from page" + ) extracted = soup.find("span", class_="small") if extracted: - federation_ids = extracted.text.lstrip('-').strip() + federation_ids = extracted.text.lstrip("-").strip() if not federation_ids.isdigit(): - raise CelcatCannotConnectError(f"Federation ids could not be extracted from '{federation_ids}'") + raise CelcatCannotConnectError( + f"Federation ids could not be extracted from '{federation_ids}'" + ) else: - raise CelcatCannotConnectError("Federation ids class could not be found") + raise CelcatCannotConnectError( + "Federation ids class could not be found" + ) _LOGGER.debug("Successfully logged in to Celcat") return True, federation_ids diff --git a/celcat_scraper/config.py b/celcat_scraper/config.py index 0bbd916..81f9b6d 100644 --- a/celcat_scraper/config.py +++ b/celcat_scraper/config.py @@ -4,13 +4,16 @@ the behavior of the Celcat scraper. """ -from dataclasses import dataclass -from typing import Optional +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional, Dict, List, Set from aiohttp import ClientSession + class CelcatConstants: """Constants for Celcat scraper configuration.""" + MAX_RETRIES = 3 CONCURRENT_REQUESTS = 5 TIMEOUT = 30 @@ -18,6 +21,53 @@ class CelcatConstants: CONNECTION_POOL_SIZE = 100 CONNECTION_KEEP_ALIVE = 120 + +class FilterType(Enum): + """Available filter types for Celcat data.""" + + COURSE_TITLE = "course_title" + COURSE_STRIP_MODULES = "course_strip_modules" + COURSE_STRIP_CATEGORY = "course_strip_category" + COURSE_STRIP_PUNCTUATION = "course_strip_punctuation" + COURSE_GROUP_SIMILAR = "course_group_similar" + COURSE_STRIP_REDUNDANT = "course_strip_redundant" + PROFESSORS_TITLE = "professors_title" + ROOMS_TITLE = "rooms_title" + ROOMS_STRIP_AFTER_NUMBER = "rooms_strip_after_number" + SITES_TITLE = "sites_title" + SITES_REMOVE_DUPLICATES = "sites_remove_duplicates" + + +@dataclass +class CelcatFilterConfig: + """Configuration for Celcat data filter. + + Attributes: + filters: Set of filters to apply + course_remembered_strips: List of previously stripped strings to be reapplied in subsequent filter instances + course_replacements: Dictionary of strings to replace in course names + """ + + filters: Set[FilterType] = field(default_factory=set) + course_remembered_strips: List[str] = field(default_factory=list) + course_replacements: Dict[str, str] = field(default_factory=dict) + + @classmethod + def with_defaults(cls) -> "CelcatFilterConfig": + """Create a filter config with default settings.""" + return cls( + filters={ + FilterType.COURSE_TITLE, + FilterType.COURSE_STRIP_MODULES, + FilterType.COURSE_STRIP_CATEGORY, + FilterType.PROFESSORS_TITLE, + FilterType.ROOMS_TITLE, + FilterType.SITES_TITLE, + FilterType.SITES_REMOVE_DUPLICATES, + } + ) + + @dataclass class CelcatConfig: """Configuration for Celcat scraper. @@ -30,9 +80,11 @@ class CelcatConfig: rate_limit: Minimum seconds between requests session: Optional aiohttp ClientSession to reuse """ + url: str username: str password: str + filter_config: CelcatFilterConfig = field(default_factory=CelcatFilterConfig.with_defaults) include_holidays: bool = True rate_limit: float = 0.5 session: Optional[ClientSession] = None diff --git a/celcat_scraper/exceptions.py b/celcat_scraper/exceptions.py index 381e3cd..dbca706 100644 --- a/celcat_scraper/exceptions.py +++ b/celcat_scraper/exceptions.py @@ -7,20 +7,20 @@ class CelcatError(Exception): """Base exception for all Celcat-related errors. - + All custom exceptions in this module inherit from this base class. """ class CelcatCannotConnectError(CelcatError): """Exception raised when connection to Celcat service fails. - + This may be due to network issues, server unavailability, or invalid URLs. """ class CelcatInvalidAuthError(CelcatError): """Exception raised when authentication credentials are invalid. - + This occurs when the provided username/password combination is incorrect. """ diff --git a/celcat_scraper/filter.py b/celcat_scraper/filter.py new file mode 100644 index 0000000..e4bac2d --- /dev/null +++ b/celcat_scraper/filter.py @@ -0,0 +1,290 @@ +"""Event data filter for Celcat calendar. + +This module provides functionality to clean and standardize calendar event data +retrieved from Celcat. +It offers various filtering options for each event attribute to facilitate classification. +""" + +import logging +import re +from typing import Dict, Any, List, Set +from collections import OrderedDict + +from .config import CelcatFilterConfig, FilterType + +_LOGGER = logging.getLogger(__name__) + + +class CelcatFilter: + """Filter for processing and standardizing Celcat calendar events. + + This class provides methods to clean, standardize, and organize calendar + event data from Celcat according to the provided configuration. + """ + + def __init__(self, config: CelcatFilterConfig) -> None: + """Initialize the filter with the provided configuration. + + Args: + config: Configuration object containing filter settings + """ + self.config = config + + async def filter_events(self, events: List[Dict[str, Any]]) -> None: + """Apply all configured filters to the event list. + + This is the main entry point for filtering events. It applies all + individual filters based on the configuration settings. + + Args: + events: List of event dictionaries to filter + """ + _LOGGER.info("Filtering Celcat events") + + for event in events: + if event.get("course"): + await self._filter_course(event) + + if event.get("professors"): + await self._filter_professors(event) + + if event.get("rooms"): + await self._filter_rooms(event) + + if event.get("sites"): + await self._filter_sites(event) + + if FilterType.COURSE_STRIP_REDUNDANT in self.config.filters: + await self._strip_redundant_courses(events) + + if FilterType.COURSE_GROUP_SIMILAR in self.config.filters: + await self._group_similar_courses(events) + + if self.config.course_replacements: + await self._replace_courses(events, self.config.course_replacements) + + async def _filter_course(self, event: Dict[str, Any]) -> None: + """Apply configured filters to a course name. + + Args: + event: Event dictionary containing course information + """ + if FilterType.COURSE_STRIP_MODULES in self.config.filters and event.get( + "modules" + ): + for module in event["modules"]: + event["course"] = re.sub( + re.escape(f" [{module}]"), + "", + event["course"], + flags=re.IGNORECASE, + ) + + if FilterType.COURSE_STRIP_CATEGORY in self.config.filters and event.get( + "category" + ): + event["course"] = re.sub( + re.escape(f" {event['category']}"), + "", + event["course"], + flags=re.IGNORECASE, + ) + + if FilterType.COURSE_STRIP_PUNCTUATION in self.config.filters: + event["course"] = re.sub(r"[.,:;!?]", "", event["course"]) + + if FilterType.COURSE_TITLE in self.config.filters: + event["course"] = event["course"].title() + + async def _filter_professors(self, event: Dict[str, Any]) -> None: + """Apply configured filters to professor names. + + Args: + event: Event dictionary containing professor information + """ + if FilterType.PROFESSORS_TITLE in self.config.filters: + for i in range(len(event["professors"])): + event["professors"][i] = event["professors"][i].title() + + async def _filter_rooms(self, event: Dict[str, Any]) -> None: + """Apply configured filters to room names. + + Args: + event: Event dictionary containing room information + """ + if FilterType.ROOMS_STRIP_AFTER_NUMBER in self.config.filters: + for i in range(len(event["rooms"])): + letter = 0 + while ( + letter < len(event["rooms"][i]) + and not event["rooms"][i][letter].isnumeric() + ): + letter += 1 + while ( + letter < len(event["rooms"][i]) + and not event["rooms"][i][letter].isalpha() + ): + letter += 1 + event["rooms"][i] = event["rooms"][i][:letter].rstrip() + + if FilterType.ROOMS_TITLE in self.config.filters: + for i in range(len(event["rooms"])): + event["rooms"][i] = event["rooms"][i].title() + + async def _filter_sites(self, event: Dict[str, Any]) -> None: + """Apply configured filters to site names. + + Args: + event: Event dictionary containing site information + """ + if FilterType.SITES_REMOVE_DUPLICATES in self.config.filters: + event["sites"] = list(OrderedDict.fromkeys(event["sites"])) + + if FilterType.SITES_TITLE in self.config.filters: + for i in range(len(event["sites"])): + event["sites"][i] = event["sites"][i].title() + + async def _strip_redundant_courses(self, events: List[Dict[str, Any]]) -> None: + """Remove redundant parts from course names across all events. + + Args: + events: List of event dictionaries + """ + new_strips = None + while new_strips != []: + new_strips = await self._find_new_course_strips( + events, self.config.course_remembered_strips + ) + self.config.course_remembered_strips += new_strips + await self._strip_courses(events, self.config.course_remembered_strips) + + async def _find_new_course_strips( + self, events: List[Dict[str, Any]], previous_strips: List[str] + ) -> List[str]: + """Find new parts of course names that can be stripped. + + Args: + events: List of event dictionaries + previous_strips: List of previously identified strips + + Returns: + List of new words that could be stripped from course names + """ + courses = await self._get_courses_names(events) + new_strips = [] + for i in range(len(courses) - 1): + for j in range(i + 1, len(courses)): + strips = await self._find_course_strips( + courses[i], courses[j] + ) or await self._find_course_strips(courses[j], courses[i]) + for strip in strips: + if strip not in previous_strips and strip not in new_strips: + new_strips.append(strip) + + _LOGGER.debug(f"New items to strip: {new_strips}") + return new_strips + + async def _get_courses_names( + self, + events: List[Dict[str, Any]], + ) -> List[str]: + """Extract unique course names from all events. + + Args: + events: List of event dictionaries + + Returns: + List of unique course names + """ + courses: Set[str] = set() + + for event in events: + if event.get("course") and event["course"] not in courses: + courses.add(event["course"]) + + return list(courses) + + async def _find_course_strips( + self, smaller_course: str, longer_course: str + ) -> List[str]: + """Find parts of the longer course name that can be stripped. + + Args: + smaller_course: The shorter course name + longer_course: The longer course name + + Returns: + List of words that could be stripped from course names + """ + smaller = smaller_course.lower() + longer = longer_course.lower() + + if smaller in longer: + while smaller in longer: + start = longer.index(smaller) + end = start + len(smaller) + + while start > 0 and longer[start] != " ": + start -= 1 + while end < len(longer) and longer[end] != " ": + end += 1 + + longer = longer[:start] + longer[end:] + return longer.split() + return [] + + async def _strip_courses( + self, events: List[Dict[str, Any]], items_to_strip: List[str] + ) -> None: + """Remove specified items from course names. + + Args: + events: List of event dictionaries + items_to_strip: List of words to remove from course names + """ + if self.config.course_strip_redundant: + _LOGGER.debug(f"Items to strip: {items_to_strip}") + for event in events: + pattern_parts = [ + r"\b" + re.escape(item) + r"\b" for item in items_to_strip + ] + pattern = re.compile("|".join(pattern_parts), re.IGNORECASE) + result = pattern.sub("", event["course"]) + event["course"] = re.sub(r"\s+", " ", result).strip() + + async def _group_similar_courses(self, events: List[Dict[str, Any]]) -> None: + """Group similar course names together. + + Args: + events: List of event dictionaries + """ + courses = await self._get_courses_names(events) + replacements = {} + + for i in range(len(courses) - 1): + courses_corresponding = [] + shortest_course = courses[i] + for j in range(len(courses)): + if shortest_course in courses[j]: + courses_corresponding.append(courses[j]) + elif courses[j] in shortest_course: + courses_corresponding.append(shortest_course) + shortest_course = courses[j] + + for course in courses_corresponding: + replacements[course] = shortest_course + + await self._replace_courses(events, replacements) + + async def _replace_courses( + self, events: List[Dict[str, Any]], replacements: Dict[str, str] + ) -> None: + """Replace course names according to the provided mapping. + + Args: + events: List of event dictionaries + replacements: Dictionary mapping old course names to new ones + """ + for event in events: + if event.get("course") and event["course"] in replacements: + event["course"] = replacements[event["course"]] diff --git a/celcat_scraper/scraper.py b/celcat_scraper/scraper.py index b604fb1..13f3f9f 100644 --- a/celcat_scraper/scraper.py +++ b/celcat_scraper/scraper.py @@ -16,6 +16,7 @@ from aiohttp import ClientSession, TCPConnector from .api import CelcatAPI +from .filter import CelcatFilter from .auth import authenticate from .config import CelcatConfig, CelcatConstants from .exceptions import CelcatCannotConnectError, CelcatError @@ -23,6 +24,7 @@ _LOGGER = logging.getLogger(__name__) + class CelcatScraperAsync: """Asynchronous scraper for interacting with Celcat calendar. @@ -46,7 +48,8 @@ def __init__(self, config: CelcatConfig) -> None: """ self._validate_config(config) self.config = config - self.api = CelcatAPI() + self.filter = CelcatFilter(config.filter_config) + self.api = CelcatAPI(config) self.federation_ids: Optional[str] = None self.session: Optional[ClientSession] = config.session self._external_session = bool(config.session) @@ -55,10 +58,10 @@ def __init__(self, config: CelcatConfig) -> None: self._headers = { "Accept-Encoding": ", ".join(CelcatConstants.COMPRESSION_TYPES), "Connection": "keep-alive", - "Keep-Alive": str(CelcatConstants.CONNECTION_KEEP_ALIVE) + "Keep-Alive": str(CelcatConstants.CONNECTION_KEEP_ALIVE), } - async def __aenter__(self) -> 'CelcatScraperAsync': + async def __aenter__(self) -> "CelcatScraperAsync": """Async context manager entry with automatic login.""" if not self.logged_in: await self.login() @@ -78,7 +81,7 @@ def _validate_config(config: CelcatConfig) -> None: if not parsed_url.scheme or not parsed_url.netloc: raise ValueError("Invalid URL format") - config.url = config.url.rstrip('/') + config.url = config.url.rstrip("/") @asynccontextmanager async def _session_context(self) -> ClientSession: @@ -89,10 +92,10 @@ async def _session_context(self) -> ClientSession: limit=CelcatConstants.CONNECTION_POOL_SIZE, enable_cleanup_closed=True, force_close=False, - keepalive_timeout=CelcatConstants.CONNECTION_KEEP_ALIVE + keepalive_timeout=CelcatConstants.CONNECTION_KEEP_ALIVE, ), headers=self._headers, - timeout=self._timeout + timeout=self._timeout, ) try: yield self.session @@ -142,10 +145,7 @@ async def login(self) -> bool: try: async with self._session_context() as session: success, federation_ids = await authenticate( - session, - self.config.url, - self.config.username, - self.config.password + session, self.config.url, self.config.username, self.config.password ) self.federation_ids = federation_ids @@ -156,7 +156,9 @@ async def login(self) -> bool: await self._cleanup_session() if isinstance(exc, (CelcatError, ValueError)): raise - raise CelcatCannotConnectError("Failed to connect to Celcat service") from exc + raise CelcatCannotConnectError( + "Failed to connect to Celcat service" + ) from exc async def _process_event(self, event: dict) -> EventData: """Convert raw event data into EventData object.""" @@ -168,33 +170,33 @@ async def _process_event(self, event: dict) -> EventData: else datetime.fromisoformat(event["end"]) ) - cleaned_sites = list({site.title() for site in (event.get("sites") or []) if site}) - processed_event: EventData = { "id": event["id"], "start": event_start, "end": event_end, "all_day": event.get("allDay", False), - "category": event.get("eventCategory", ""), + "category": event.get("eventCategory", "") or "", "course": "", "rooms": [], "professors": [], - "modules": event.get("modules", []), - "department": event.get("department", ""), - "sites": cleaned_sites, - "faculty": event.get("faculty", ""), - "notes": "" + "modules": event.get("modules", []) or [], + "department": event.get("department", "") or "", + "sites": event.get("sites", []) or [], + "faculty": event.get("faculty", "") or "", + "notes": "", } - event_data = await self.api.get_side_bar_event_raw_data(self.session, self.config.url, event["id"]) + event_data = await self.api.get_side_bar_event_raw_data( + self.session, self.config.url, event["id"] + ) for element in event_data["elements"]: if element["entityType"] == 100 and processed_event["course"] == "": - processed_event["course"] = element["content"].replace(f" [{element['federationId']}]", "").replace(f" {event['eventCategory']}", "").title() + processed_event["course"] = element["content"] elif element["entityType"] == 101: - processed_event["professors"].append(element["content"].title()) + processed_event["professors"].append(element["content"]) elif element["entityType"] == 102: - processed_event["rooms"].append(element["content"].title()) + processed_event["rooms"].append(element["content"]) elif element["isNotes"] and element.get("content"): processed_event["notes"] = element["content"] @@ -205,6 +207,7 @@ async def _process_event(self, event: dict) -> EventData: async def _process_event_batch(self, events: List[dict]) -> List[EventData]: """Process multiple events concurrently.""" + async def process_single_event(event: dict) -> Optional[EventData]: try: if not event["allDay"] or self.config.include_holidays: @@ -217,7 +220,10 @@ async def process_single_event(event: dict) -> Optional[EventData]: results = await asyncio.gather(*tasks, return_exceptions=True) _LOGGER.info(f"Finished processing new events with {len(events)} requests") - return [r for r in results if r is not None and not isinstance(r, Exception)] + events = [r for r in results if r is not None and not isinstance(r, Exception)] + + await self.filter.filter_events(events) + return events @staticmethod def serialize_events(events: List[EventData], file_path: str) -> None: @@ -227,12 +233,13 @@ def serialize_events(events: List[EventData], file_path: str) -> None: events: List of EventData to serialize file_path: Path where to save the JSON file """ + def datetime_handler(obj): if isinstance(obj, datetime): return obj.isoformat() raise TypeError(f"Object of type {type(obj)} is not JSON serializable") - with open(file_path, 'w', encoding='utf-8') as f: + with open(file_path, "w", encoding="utf-8") as f: json.dump(events, f, default=datetime_handler, ensure_ascii=False, indent=2) @staticmethod @@ -248,7 +255,7 @@ def deserialize_events(file_path: str) -> List[EventData]: if not Path(file_path).exists(): return [] - with open(file_path, 'r', encoding='utf-8') as f: + with open(file_path, "r", encoding="utf-8") as f: data = json.load(f) for event in data: @@ -258,10 +265,7 @@ def deserialize_events(file_path: str) -> List[EventData]: return data async def get_calendar_events( - self, - start: date, - end: date, - previous_events: Optional[List[EventData]] = None + self, start: date, end: date, previous_events: Optional[List[EventData]] = None ) -> List[EventData]: """Get calendar events for a specified time period. @@ -293,11 +297,7 @@ async def get_calendar_events( _LOGGER.info("Retrieving calendar events for period %s to %s", start, end) calendar_raw_data = await self.api.get_calendar_raw_data( - self.session, - self.config.url, - self.federation_ids, - start, - end + self.session, self.config.url, self.federation_ids, start, end ) calendar_raw_data.sort(key=lambda x: x["start"]) @@ -335,16 +335,25 @@ async def get_calendar_events( matching_event = None for prev_event in in_range_events: - if raw_event["id"] == prev_event["id"] and ( - (raw_event["allDay"] and prev_event["all_day"]) - or (event_start == prev_event["start"] and event_end == prev_event["end"]) - ) and ( - raw_event["eventCategory"] == prev_event["category"] - ) and ( - raw_event["modules"] or [] == prev_event["modules"] - ) and ( - prev_event["all_day"] - or (prev_event["rooms"] and prev_event["rooms"][0].lower() in html.unescape(raw_event["description"]).lower()) + if ( + raw_event["id"] == prev_event["id"] + and ( + (raw_event["allDay"] and prev_event["all_day"]) + or ( + event_start == prev_event["start"] + and event_end == prev_event["end"] + ) + ) + and (raw_event["eventCategory"] == prev_event["category"]) + and (raw_event["modules"] or [] == prev_event["modules"]) + and ( + prev_event["all_day"] + or ( + prev_event["rooms"] + and prev_event["rooms"][0].lower() + in html.unescape(raw_event["description"]).lower() + ) + ) ): matching_event = prev_event in_range_events.remove(prev_event) @@ -360,6 +369,7 @@ async def get_calendar_events( _LOGGER.debug("Event data requested") final_events.extend(out_of_range_events) - _LOGGER.info(f"Finished processing events with {total_requests} requests") - return sorted(final_events, key=lambda x: x["start"]) + + await self.filter.filter_events(final_events) + return final_events diff --git a/celcat_scraper/types.py b/celcat_scraper/types.py index 1c37b37..e11a102 100644 --- a/celcat_scraper/types.py +++ b/celcat_scraper/types.py @@ -7,11 +7,13 @@ from datetime import datetime from typing import List, TypedDict + class EventData(TypedDict): """Type definition for event data. Represents a calendar event with all its attributes. """ + id: str start: datetime end: datetime diff --git a/celcat_scraper/utils.py b/celcat_scraper/utils.py index f21b426..2a7201c 100644 --- a/celcat_scraper/utils.py +++ b/celcat_scraper/utils.py @@ -7,10 +7,12 @@ import asyncio import time + class RateLimiter: """Rate limiter for API requests with adaptive backoff.""" - def __init__(self, calls_per_second: float = 2.0): - self.delay = 1.0 / calls_per_second + + def __init__(self, rate_limit: float = 2.0): + self.delay = rate_limit self.last_call = 0.0 self._backoff_factor = 1.0 @@ -19,7 +21,7 @@ async def acquire(self): now = time.monotonic() delay = self.delay * self._backoff_factor elapsed = now - self.last_call - if (elapsed < delay): + if elapsed < delay: await asyncio.sleep(delay - elapsed) self.last_call = time.monotonic() @@ -30,4 +32,3 @@ def increase_backoff(self): def reset_backoff(self): """Reset backoff factor on success.""" self._backoff_factor = 1.0 - diff --git a/pyproject.toml b/pyproject.toml index b0d969a..d97deeb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "celcat_scraper" -version = "1.0.0" +version = "1.1.0" dependencies = [ "aiohttp>=3.8.0", "beautifulsoup4>=4.4.0",