diff --git a/commands.py b/commands.py new file mode 100644 index 0000000..3c07ccb --- /dev/null +++ b/commands.py @@ -0,0 +1,238 @@ +import discord +from utils import CONFIG, save_config, is_bot_owner +from discord.ext import commands + + +async def setup_commands(client: commands.Bot) -> None: + tree = client.tree + + # ✅ Toggle Debug Mode + @tree.command(name="toggle_debug", description="Toggle debug mode") + async def toggle_debug(interaction: discord.Interaction) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["debug"] = not CONFIG.get("debug", False) + save_config() + + await interaction.response.send_message( + f"✅ Debug mode {'enabled' if CONFIG['debug'] else 'disabled'}." + ) + + # ✅ Set Refresh Time + @tree.command(name="set_refresh_time", description="Set bot refresh time") + async def set_refresh_time(interaction: discord.Interaction, seconds: int) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["refresh_time"] = seconds + save_config() + + await interaction.response.send_message( + f"✅ Refresh time updated to {seconds} seconds." + ) + + # ✅ Set Game TTL + @tree.command(name="set_game_ttl", description="Set game timeout duration") + async def set_game_ttl(interaction: discord.Interaction, seconds: int) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["game_ttl"] = seconds + save_config() + + await interaction.response.send_message( + f"✅ Game TTL updated to {seconds} seconds." + ) + + # ✅ Add Game Type + @tree.command(name="add_game_type", description="Add a new game type") + async def add_game_type( + interaction: discord.Interaction, code: str, name: str + ) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["game_types"][code.upper()] = name + save_config() + + await interaction.response.send_message( + f"✅ Added game type: `{code.upper()}` - {name}" + ) + + # ✅ Set Discord Channel ID + @tree.command(name="set_channel", description="Set the bot's Discord channel") + async def set_channel( + interaction: discord.Interaction, channel: discord.TextChannel + ) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["discord_channel_id"] = channel.id + save_config() + + await interaction.response.send_message( + f"✅ Discord channel set to {channel.mention}." + ) + + # ✅ Add a Bot Owner + @tree.command(name="add_bot_owner", description="Add a bot owner by user ID") + async def add_bot_owner(interaction: discord.Interaction, user_id: int) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["bot_owners"].append(user_id) + save_config() + + await interaction.response.send_message( + f"✅ User with ID `{user_id}` added as a bot owner." + ) + + # ✅ Remove a Bot Owner + @tree.command(name="remove_bot_owner", description="Remove a bot owner by user ID") + async def remove_bot_owner(interaction: discord.Interaction, user_id: int) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + if user_id in CONFIG["bot_owners"]: + CONFIG["bot_owners"].remove(user_id) + save_config() + await interaction.response.send_message( + f"✅ Removed user with ID `{user_id}` from bot owners." + ) + else: + await interaction.response.send_message( + f"⚠ User with ID `{user_id}` is not a bot owner.", ephemeral=True + ) + + # ✅ Add Difficulty + @tree.command(name="add_difficulty", description="Add a new difficulty level") + async def add_difficulty(interaction: discord.Interaction, difficulty: str) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["difficulties"].append(difficulty) + save_config() + + await interaction.response.send_message(f"✅ Added difficulty: `{difficulty}`.") + + # ✅ Remove Difficulty + @tree.command(name="remove_difficulty", description="Remove a difficulty level") + async def remove_difficulty( + interaction: discord.Interaction, difficulty: str + ) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + if difficulty in CONFIG["difficulties"]: + CONFIG["difficulties"].remove(difficulty) + save_config() + await interaction.response.send_message( + f"✅ Removed difficulty: `{difficulty}`." + ) + else: + await interaction.response.send_message( + f"⚠ Difficulty `{difficulty}` not found.", ephemeral=True + ) + + # ✅ Set Tick Rate + @tree.command(name="set_tick_rate", description="Set a tick rate") + async def set_tick_rate( + interaction: discord.Interaction, rate: int, name: str + ) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["tick_rates"][str(rate)] = name + save_config() + + await interaction.response.send_message( + f"✅ Set tick rate `{rate}` to `{name}`." + ) + + # ✅ Remove Tick Rate + @tree.command(name="remove_tick_rate", description="Remove a tick rate") + async def remove_tick_rate(interaction: discord.Interaction, rate: int) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + rate_str = str(rate) + if rate_str in CONFIG["tick_rates"]: + del CONFIG["tick_rates"][rate_str] + save_config() + await interaction.response.send_message(f"✅ Removed tick rate `{rate}`.") + else: + await interaction.response.send_message( + f"⚠ Tick rate `{rate}` not found.", ephemeral=True + ) + + # ✅ Add Game Option + @tree.command(name="add_game_option", description="Add a new game option") + async def add_game_option( + interaction: discord.Interaction, key: str, value: str + ) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + CONFIG["game_options"][key] = value + save_config() + + await interaction.response.send_message( + f"✅ Added game option: `{key}` - {value}" + ) + + # ✅ Remove Game Option + @tree.command(name="remove_game_option", description="Remove a game option") + async def remove_game_option(interaction: discord.Interaction, key: str) -> None: + if not is_bot_owner(interaction): + await interaction.response.send_message( + "❌ You do not have permission.", ephemeral=True + ) + return + + if key in CONFIG["game_options"]: + del CONFIG["game_options"][key] + save_config() + await interaction.response.send_message(f"✅ Removed game option `{key}`.") + else: + await interaction.response.send_message( + f"⚠ Game option `{key}` not found.", ephemeral=True + ) + + await client.tree.sync() diff --git a/config.json b/config.json new file mode 100644 index 0000000..7458071 --- /dev/null +++ b/config.json @@ -0,0 +1,40 @@ +{ + "debug": false, + "discord_channel_id": 1061483226767556719, + "refresh_time": 30, + "game_ttl": 120, + "bot_owners": [], + "difficulties": [ + "Normal", + "Nightmare", + "Hell" + ], + "tick_rates": { + "20": "Normal", + "30": "Fast", + "40": "Faster", + "50": "Fastest" + }, + "game_types": { + "DRTL": "DevilutionX (Diablo)", + "DSHR": "DevilutionX (Diablo Shareware)", + "HRTL": "DevilutionX (Hellfire)", + "HSHR": "DevilutionX (Hellfire Shareware)", + "IRON": "sixcy - Ironman", + "MEMD": "DakkDaniels - Middle Earth", + "DRDX": "ikonomov - DiabloX", + "DWKD": "wkdgmr - wkdmod (Diablo)", + "HWKD": "wkdgmr - wkdmod (Hellfire)", + "LTDR": "kphoenix - Lord of Terror (Diablo)", + "LTDS": "kphoenix - Lord of Terror (Diablo Shareware)", + "LTHR": "kphoenix - Lord of Terror (Hellfire)", + "LTHS": "kphoenix - Lord of Terror (Hellfire Shareware)" + }, + "game_options": { + "run_in_town": "Run in Town", + "full_quests": "Quests", + "theo_quest": "Theo Quest", + "cow_quest": "Cow Quest", + "friendly_fire": "Friendly Fire" + } +} \ No newline at end of file diff --git a/discord_bot.py b/discord_bot.py index 6cfe3d7..7571519 100644 --- a/discord_bot.py +++ b/discord_bot.py @@ -1,295 +1,32 @@ -import asyncio -from collections import deque import discord -import json -import logging -import math -import re -import time -from typing import Any, Deque, Dict, List, Optional +import asyncio +from discord.ext import commands +from game_manager import refresh_game_list +from commands import setup_commands +from utils import CONFIG -logger = logging.getLogger(__name__) +# Setup Discord Bot intents = discord.Intents.default() -intents.message_content = True - -client = discord.Client(intents=intents) -channel: Optional[discord.TextChannel] = None - -config: Dict[str, Any] = { - 'channel': 1061483226767556719, - 'game_ttl': 120, - 'refresh_seconds': 60, - 'banlist_file': './banlist', - 'gamelist_program': './devilutionx-gamelist' -} - -def escape_discord_formatting_characters(text: str) -> str: - return re.sub(r'([-\\*_#|~:@[\]()<>`])', r'\\\1', text) - - -def format_game_message(game: Dict[str, Any]) -> str: - ended = time.time() - game['last_seen'] >= config['game_ttl'] - text = '' - if ended: - text += '~~' + game['id'].upper() + '~~' - else: - text += '**' + game['id'].upper() + '**' - match game['type']: - case 'DRTL': - text += ' <:diabloico:760201452957335552>' - case 'DSHR': - text += ' <:diabloico:760201452957335552> (spawn)' - case 'HRTL': - text += ' <:hellfire:766901810580815932>' - case 'HSHR': - text += ' <:hellfire:766901810580815932> (spawn)' - case 'IRON': - text += ' Ironman' - case 'MEMD': - text += ' <:one_ring:1061898681504251954>' - case 'DRDX': - text += ' <:diabloico:760201452957335552> X' - case 'DWKD': - text += ' <:mod_wkd:1097321063077122068> modDiablo' - case 'HWKD': - text += ' <:mod_wkd:1097321063077122068> modHellfire' - case _: - text += ' ' + game['type'] - - text += ' ' + game['version'] - - match game['tick_rate']: - case 20: - text += '' - case 30: - text += ' Fast' - case 40: - text += ' Faster' - case 50: - text += ' Fastest' - case _: - text += ' speed: ' + str(game['tick_rate']) - - match game['difficulty']: - case 0: - text += ' Normal' - case 1: - text += ' Nightmare' - case 2: - text += ' Hell' - - attributes = [] - if game['run_in_town']: - attributes.append('Run in Town') - if game['full_quests']: - attributes.append('Quests') - if game['theo_quest'] and game['type'] != 'DRTL': - attributes.append('Theo Quest') - if game['cow_quest'] and game['type'] != 'DRTL': - attributes.append('Cow Quest') - if game['friendly_fire']: - attributes.append('Friendly Fire') - - if len(attributes) != 0: - text += ' (' - text += ', '.join(attributes) - text += ')' - - text += '\nPlayers: **' + '**, **'.join([escape_discord_formatting_characters(name) for name in game['players']]) + '**' - text += '\nStarted: ' - if ended: - text += '\nEnded after: `' + format_time_delta(round((time.time() - game['first_seen']) / 60)) + '`' - - return text - - -def format_status_message(current_online: int) -> str: - if current_online == 1: - return 'There is currently **' + str(current_online) + '** public game.' - return 'There are currently **' + str(current_online) + '** public games.' - - -def format_time_delta(minutes: int) -> str: - if minutes < 2: - return '1 minute' - elif minutes < 60: - return str(minutes) + ' minutes' - - text = '' - if minutes < 120: - text += '1 hour' - minutes -= 60 - else: - hours = math.floor(minutes / 60) - text += str(hours) + ' hours' - minutes -= hours * 60 - - if minutes > 0: - text += ' and ' + format_time_delta(minutes) - - return text - - -def any_player_name_is_invalid(players: List[str]) -> bool: - for name in players: - # using the same restricted character list as DevilutionX, see - # https://github.com/diasurgical/devilutionX/blob/0eda8d9367e08cea08b2ad81e1ce534e927646d6/Source/DiabloUI/diabloui.cpp#L649 - if re.search(r'[,<>%&\\"?*#/: ]', name): - return True - - for char in name: - if ord(char) < 32 or ord(char) > 126: - # ASCII control characters or anything outside the basic latin set aren't allowed - # in the current DevilutionX codebase, see - # https://github.com/diasurgical/devilutionX/blob/0eda8d9367e08cea08b2ad81e1ce534e927646d6/Source/DiabloUI/diabloui.cpp#L654 - return True - - return False - - -def any_player_name_contains_a_banned_word(players: List[str]) -> bool: - if config['banlist_file'] != '': - try: - with open(config['banlist_file'], 'r') as ban_list_file: - words = set([line.strip().upper() for line in ban_list_file.read().split('\n') if line.strip()]) - - for name in players: - for word in words: - if word in name.upper(): - return True - except: - logger.warn('Unable to load banlist file') - - return False - - -async def update_message(message: discord.Message, text: str) -> Optional[discord.Message]: - if message.content != text: - try: - message = await message.edit(content=text) - except discord.errors.NotFound: - return None - return message - - -async def send_message(text: str) -> discord.Message: - assert isinstance(channel, discord.TextChannel) - return await channel.send(text) - - -async def background_task() -> None: - known_games: Dict[str, Dict[str, Any]] = {} - active_messages: Deque[discord.Message] = deque() - - last_refresh = 0.0 - while True: - try: - sleep_time = config['refresh_seconds'] - (time.time() - last_refresh) - if sleep_time > 0: - await asyncio.sleep(sleep_time) - last_refresh = time.time() - - # Call the external program and get the output - proc = await asyncio.create_subprocess_shell( - config['gamelist_program'], - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE) - - try: - stdout, stderr = await asyncio.wait_for(proc.communicate(), 30) - except TimeoutError: - proc.terminate() - continue - output = stdout.decode() - if not output: - continue - - # Load the output as a JSON list - games = json.loads(output) - - logger.info('Refreshing game list - ' + str(len(games)) + ' games') - - for game in games: - if any_player_name_is_invalid(game['players']) or any_player_name_contains_a_banned_word(game['players']): - continue - - key = game['id'].upper() - if key in known_games: - known_games[key]['players'] = game['players'] - else: - known_games[key] = game - known_games[key]['first_seen'] = time.time() - - known_games[key]['last_seen'] = time.time() - - ended_games = [key for key, game in known_games.items() if time.time() - game['last_seen'] >= config['game_ttl']] - - for key in ended_games: - if active_messages: - await update_message(active_messages.popleft(), format_game_message(known_games[key])) - del known_games[key] - - message_index = 0 - for game in known_games.values(): - message_text = format_game_message(game) - if message_index < len(active_messages): - message = await update_message(active_messages[message_index], message_text) - assert message is not None - active_messages[message_index] = message - else: - message = await send_message(message_text) - assert message is not None - active_messages.append(message) - message_index += 1 - - game_count = len(known_games) - if (len(active_messages) <= game_count): - message = await send_message(format_status_message(game_count)) - assert message is not None - active_messages.append(message) - else: - await update_message(active_messages[game_count], format_status_message(game_count)) - - activity = discord.Activity(name='Games online: '+str(game_count), type=discord.ActivityType.watching) - await client.change_presence(activity=activity) - except discord.DiscordException as discord_error: - logger.warn(repr(discord_error)) +client = commands.Bot(command_prefix="!", intents=intents) @client.event async def on_ready() -> None: - logger.info(f'We have logged in as {client.user}') - - maybeChannel = client.get_channel(config['channel']) - assert isinstance(maybeChannel, discord.TextChannel) - - global channel - channel = maybeChannel - await background_task() + print(f"✅ Logged in as {client.user}") + await setup_commands(client) + client.loop.create_task(background_task()) -def run(runtimeConfig: Dict[str, Any]) -> None: - assert 'token' in runtimeConfig - - for key, value in runtimeConfig.items(): - config[key] = value - - client.run(config['token']) - - -def main() -> None: - logger.setLevel(logging.INFO) - handler = logging.StreamHandler() - formatter = logging.Formatter('[{asctime}] [{levelname:<8}] {name}: {message}', '%Y-%m-%d %H:%M:%S', style='{') - handler.setFormatter(formatter) - logger.addHandler(handler) - - with open('./discord_bot.json', 'r') as file: - runtimeConfig = json.load(file) +async def background_task() -> None: + """Periodically refreshes the game list.""" + while True: + await refresh_game_list(client) + await asyncio.sleep(CONFIG["refresh_time"]) - run(runtimeConfig) +# Run Bot +with open("./discord_bot_token", "r") as file: + token = file.readline().strip() -if __name__ == '__main__': - main() +client.run(token) diff --git a/game_manager.py b/game_manager.py new file mode 100644 index 0000000..461a919 --- /dev/null +++ b/game_manager.py @@ -0,0 +1,193 @@ +import discord +import json +import time +import os +import asyncio +from utils import CONFIG, debug_print, format_game_embed +from typing import Dict, List, Any + +game_list: Dict[str, Dict[str, Any]] = {} + + +async def fetch_game_list() -> List[Dict[str, Any]]: + """Fetches the game list by running the external program.""" + try: + exe_name = ( + "devilutionx-gamelist.exe" if os.name == "nt" else "./devilutionx-gamelist" + ) + print(f"🔄 Running: {exe_name}") + + proc = await asyncio.create_subprocess_shell( + exe_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), 30) + except asyncio.TimeoutError: + proc.terminate() # Kill the process if it takes too long + print("⚠️ Timeout: devilutionx-gamelist took too long to respond.") + return [] + + if stderr: + debug_print(f"🔍 [DEBUG] ZeroTier Logs:\n{stderr.decode().strip()}") + + output = stdout.decode().strip() + debug_print(f"📜 Raw output:\n{output if output else '⚠ No output received!'}") + + if not output: + return [] + + try: + data = json.loads(output) + if isinstance(data, list) and all(isinstance(game, dict) for game in data): + print(f"✅ Successfully parsed JSON. {len(data)} game(s) found.") + return data + else: + print(f"❌ JSON structure is not valid: {data}") + return [] + except json.JSONDecodeError as e: + print(f"❌ JSON Parsing Error: {e}") + return [] + + except Exception as e: + print(f"❌ Error fetching game list: {e}") + return [] + + +async def fetch_game_list_with_retries( + retries: int = 3, delay: int = 5 +) -> List[Dict[str, Any]]: + """Fetches game list with retries if the result is empty.""" + for attempt in range(retries): + games = await fetch_game_list() # Try to fetch game list + if games: # If we got games, return them + return games + + print( + f"⚠️ Attempt {attempt + 1}: Received empty game list. Retrying in {delay}s..." + ) + await asyncio.sleep(delay) + + print("❌ No games found after multiple attempts. Assuming closure.") + return [] # Return empty list if still no data after retries + + +async def refresh_game_list(client: discord.Client) -> None: + """Fetches and updates the game list from the external source.""" + print("🔄 Refreshing game list...") + + channel = client.get_channel(CONFIG["discord_channel_id"]) + if not isinstance(channel, discord.TextChannel): # Validate channel type + print("❌ Channel not found or invalid! Aborting refresh.") + return + + games = await fetch_game_list_with_retries() + current_time = time.time() + + print(f"📥 Fetched {len(games)} games from external source.") + + # Process newly fetched games + for game in games: + if not isinstance(game, dict): # Ensure game data is a dictionary + print(f"⚠️ Invalid game format: {game}") + continue + + game_id = game.get("id", "").upper() + if not game_id: # Ensure game ID is valid + print(f"⚠️ Skipping game with missing ID: {game}") + continue + + if game_id in game_list: + game_list[game_id]["last_seen"] = current_time + print(f"✅ Updated last_seen for {game_id}.") + continue + + # Add new game entry + game["first_seen"] = current_time + game["last_seen"] = current_time + game_list[game_id] = game + + print(f"➕ New game added: {game_id}") + + # Create and send embed + embed = format_game_embed(game) + img_path = f"images/{game['type']}.png" + + try: + if os.path.exists(img_path): + file = discord.File(img_path, filename=f"{game['type']}.png") + embed.set_thumbnail(url=f"attachment://{game['type']}.png") + game_list[game_id]["message"] = await channel.send( + embed=embed, file=file + ) + debug_print(f"📤 Sent embed with image for {game_id}.") + else: + game_list[game_id]["message"] = await channel.send(embed=embed) + debug_print(f"📤 Sent embed for {game_id} (no image).") + + except Exception as e: + print(f"❌ Error sending embed for {game_id}: {e}") + + # Handle expired games + expired_games = [] + for game_id, game in list(game_list.items()): + time_since_last_seen = current_time - game["last_seen"] + + debug_print( + f"⏳ Checking expiration for {game_id}: last seen {time_since_last_seen:.2f}s ago (TTL: {CONFIG['game_ttl']}s)" + ) + + if time_since_last_seen >= CONFIG["game_ttl"]: + try: + if "message" not in game or not game["message"]: + print(f"⚠️ Warning: No message object for {game_id}. Skipping.") + continue + + embed = format_game_embed( + game + ) # Generate the updated embed (turns red) + + bot_permissions = None + if isinstance(channel, discord.TextChannel) and channel.guild: + bot_permissions = channel.permissions_for(channel.guild.me) + else: + print( + f"⚠ Error: Cannot check permissions for {CONFIG['discord_channel_id']} (Invalid channel type)" + ) + continue + + if not bot_permissions.manage_messages: + print(f"🚨 Missing permission to edit messages in {channel.name}.") + continue + + await game["message"].edit(embed=embed) # Update the message in Discord + print(f"🔴 Marked game {game_id} as expired and updated embed.") + + except discord.errors.NotFound: + print( + f"⚠️ Warning: Message for game {game_id} not found. It may have been deleted." + ) + + except discord.errors.Forbidden: + print( + f"🚨 Error: Missing permission to edit messages in {channel.name}." + ) + + except Exception as e: + print(f"❌ Error updating embed for expired game {game_id}: {e}") + + expired_games.append(game_id) # Mark for removal + + # Remove expired games + for game_id in expired_games: + del game_list[game_id] + print(f"🗑 Removed expired game {game_id} from tracking.") + + # Update bot status with the number of active games + current_online = len(game_list) + activity = discord.Activity( + name=f"Games online: {current_online}", type=discord.ActivityType.watching + ) + await client.change_presence(activity=activity) + print(f"🎮 Updated bot status: Watching {current_online} games online.") + + print("✅ Game list refresh complete.") diff --git a/images/DRTL.png b/images/DRTL.png new file mode 100644 index 0000000..fa2c8e1 Binary files /dev/null and b/images/DRTL.png differ diff --git a/images/DSHR.png b/images/DSHR.png new file mode 100644 index 0000000..496e491 Binary files /dev/null and b/images/DSHR.png differ diff --git a/images/HRTL.png b/images/HRTL.png new file mode 100644 index 0000000..010d798 Binary files /dev/null and b/images/HRTL.png differ diff --git a/images/HSHR.png b/images/HSHR.png new file mode 100644 index 0000000..34113ab Binary files /dev/null and b/images/HSHR.png differ diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..3a2f23e --- /dev/null +++ b/utils.py @@ -0,0 +1,147 @@ +import json +import discord +import os +import time +from typing import Dict, Any +import re + + +def debug_print( + *args: Any, sep: str = " ", end: str = "\n", file: Any = None, flush: bool = False +) -> None: + """Prints messages only if debug mode is enabled.""" + if CONFIG.get("debug", False): + print(*args, sep=sep, end=end, file=file, flush=flush) + + +FORBIDDEN_CHARS = r'[,<>%&\\"?*#/: ]' # Characters not allowed + + +def sanitize_player_name(name: str) -> str: + """Removes forbidden characters from a player's name.""" + return re.sub(FORBIDDEN_CHARS, "", name) # Strip invalid characters + + +def load_banlist() -> set[str]: + """Loads the banlist from file and returns a set of bad words.""" + try: + with open("./banlist", "r") as file: + return {line.strip().upper() for line in file if line.strip()} + except FileNotFoundError: + print("⚠ Warning: Banlist file not found. No words will be filtered.") + return set() + + +BANNED_WORDS = load_banlist() + + +def censor_bad_words(name: str) -> str: + """Replaces banned words in a player's name with asterisks.""" + name_upper = name.upper() + for bad_word in BANNED_WORDS: + if bad_word in name_upper: + masked_word = "*" * len(bad_word) # Replace with asterisks + pattern = re.compile( + re.escape(bad_word), re.IGNORECASE + ) # Case-insensitive match + name = pattern.sub(masked_word, name) # Replace in original case + return name + + +CONFIG: Dict[str, Any] = {} + + +def load_config() -> None: + """Loads the bot configuration from file and stores it globally.""" + global CONFIG + with open("config.json", "r") as f: + CONFIG = json.load(f) + + +# Load Global config immediately +load_config() + + +def save_config() -> None: + """Saves the current global config back to file.""" + with open("config.json", "w") as f: + json.dump(CONFIG, f, indent=4) + + +def is_bot_owner(interaction: discord.Interaction) -> bool: + """Check if user is a bot owner.""" + if isinstance(interaction.user, discord.Member): # Ensure it's a Member + return interaction.user.id in CONFIG["bot_owners"] + return False # Default to False if it's not a Member + + +def format_time_hhmmss(seconds: int) -> str: + """Formats elapsed time as HH:MM:SS or DD:HH:MM:SS if over 24 hours.""" + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + + if days > 0: + return f"{days:02}:{hours:02}:{minutes:02}:{seconds:02}" # DD:HH:MM:SS + return f"{hours:02}:{minutes:02}:{seconds:02}" # HH:MM:SS + + +def format_game_embed(game: Dict[str, Any]) -> discord.Embed: + """Formats the game data into a Discord embed.""" + embed = discord.Embed(color=discord.Colour.green()) # Default: Green for active + + current_time = time.time() + expired = (current_time - game["last_seen"]) >= CONFIG["game_ttl"] + + # Game ID & Expired Status + game_name = game["id"].upper() + if expired: + embed.colour = discord.Colour.red() # Change to Red + game_name = "❌" + + # Core Game Info + difficulty = ( + CONFIG["difficulties"][game["difficulty"]] + if 0 <= game["difficulty"] < len(CONFIG["difficulties"]) + else "Unknown" + ) + speed = CONFIG["tick_rates"].get( + str(game["tick_rate"]), f"Custom ({game['tick_rate']})" + ) + players = ", ".join( + censor_bad_words(sanitize_player_name(player)) for player in game["players"] + ) + + options = [ + CONFIG["game_options"].get(opt, opt) + for opt in game + if game.get(opt) and opt in CONFIG["game_options"] + ] + options_text = ", ".join(options) if options else "None" + + # Determine elapsed time display + elapsed_seconds = int(current_time - game["first_seen"]) + elapsed_time = format_time_hhmmss(elapsed_seconds) + + if expired: + time_display = f"🕒 `{elapsed_time}`" # Duration game was open + else: + time_display = f"⏳ " # Discord timestamp + + # Embed Content + info_text = ( + f"🎮 {CONFIG['game_types'].get(game['type'], 'Unknown Game')} ({game['version']})\n" + f"👥 {players}\n" + f"🛡️ {difficulty} | ⚡ {speed}\n" + f"🛠️ {options_text}\n" + f"{time_display}" + ) + + embed.add_field(name=game_name, value=info_text, inline=False) + + # Thumbnail + img_path = f"images/{game['type']}.png" + if os.path.exists(img_path): + embed.set_thumbnail(url=f"attachment://{game['type']}.png") + + return embed