diff --git a/omnimcp/config.py b/omnimcp/config.py index 8a9bbd2..924f9c0 100644 --- a/omnimcp/config.py +++ b/omnimcp/config.py @@ -1,3 +1,5 @@ +# omnimcp/config.py + """Configuration management for OmniMCP.""" import os @@ -9,18 +11,21 @@ class OmniMCPConfig(BaseSettings): """Configuration settings for OmniMCP.""" - + # Claude API configuration ANTHROPIC_API_KEY: Optional[str] = None - + + # Auto-shutdown OmniParser after 60min inactivity + INACTIVITY_TIMEOUT_MINUTES: int = 60 + # OmniParser configuration OMNIPARSER_URL: Optional[str] = None - + # AWS deployment settings (for remote OmniParser) AWS_ACCESS_KEY_ID: Optional[str] = None AWS_SECRET_ACCESS_KEY: Optional[str] = None AWS_REGION: Optional[str] = "us-west-2" - + # OmniParser deployment configuration PROJECT_NAME: str = "omniparser" REPO_URL: str = "https://github.com/microsoft/OmniParser.git" @@ -30,19 +35,20 @@ class OmniMCPConfig(BaseSettings): AWS_EC2_USER: str = "ubuntu" PORT: int = 8000 # FastAPI port COMMAND_TIMEOUT: int = 600 # 10 minutes - + # Debug settings DEBUG: bool = False LOG_LEVEL: str = "INFO" - + class Config: """Pydantic settings configuration.""" + env_file = ".env" env_file_encoding = "utf-8" - + # Allow extra fields in the settings extra = "ignore" - + # Properties for OmniParser deployment @property def CONTAINER_NAME(self) -> str: @@ -68,4 +74,4 @@ def AWS_EC2_SECURITY_GROUP(self) -> str: # Create a global config instance -config = OmniMCPConfig() \ No newline at end of file +config = OmniMCPConfig() diff --git a/omnimcp/omniparser/Dockerfile b/omnimcp/omniparser/Dockerfile index f14ea7a..4d8cc62 100644 --- a/omnimcp/omniparser/Dockerfile +++ b/omnimcp/omniparser/Dockerfile @@ -1,3 +1,5 @@ +# omnimcp/ominparser/Dockerfile + FROM nvidia/cuda:12.3.1-devel-ubuntu22.04 RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \ diff --git a/omnimcp/omniparser/client.py b/omnimcp/omniparser/client.py index 51c2e1a..166dfa3 100644 --- a/omnimcp/omniparser/client.py +++ b/omnimcp/omniparser/client.py @@ -1,3 +1,5 @@ +# omnimcp/omniparser/client.py + """Client module for interacting with the OmniParser server.""" import base64 @@ -36,14 +38,15 @@ def _ensure_server(self) -> None: # Check if any instances are running import boto3 - ec2 = boto3.resource('ec2') + + ec2 = boto3.resource("ec2") instances = ec2.instances.filter( Filters=[ - {'Name': 'tag:Name', 'Values': ['omniparser']}, - {'Name': 'instance-state-name', 'Values': ['running']} + {"Name": "tag:Name", "Values": ["omniparser"]}, + {"Name": "instance-state-name", "Values": ["running"]}, ] ) - + instance = next(iter(instances), None) if instance and instance.public_ip_address: self.server_url = f"http://{instance.public_ip_address}:8000" @@ -57,8 +60,8 @@ def _ensure_server(self) -> None: for i in range(max_retries): instances = ec2.instances.filter( Filters=[ - {'Name': 'tag:Name', 'Values': ['omniparser']}, - {'Name': 'instance-state-name', 'Values': ['running']} + {"Name": "tag:Name", "Values": ["omniparser"]}, + {"Name": "instance-state-name", "Values": ["running"]}, ] ) instance = next(iter(instances), None) @@ -69,9 +72,7 @@ def _ensure_server(self) -> None: else: raise RuntimeError("Failed to deploy server") else: - raise RuntimeError( - "No server URL provided and auto_deploy is disabled" - ) + raise RuntimeError("No server URL provided and auto_deploy is disabled") # Verify server is responsive self._check_server() @@ -101,7 +102,7 @@ def parse_image(self, image: Image.Image) -> Dict: response = requests.post( f"{self.server_url}/parse/", json={"base64_image": image_bytes}, - timeout=30 + timeout=30, ) response.raise_for_status() return response.json() @@ -112,14 +113,13 @@ def parse_image(self, image: Image.Image) -> Dict: def _image_to_base64(image: Image.Image) -> str: """Convert PIL Image to base64 string.""" import io + buffered = io.BytesIO() image.save(buffered, format="PNG") return base64.b64encode(buffered.getvalue()).decode() def visualize_results( - self, - image: Image.Image, - parsed_content: List[Dict] + self, image: Image.Image, parsed_content: List[Dict] ) -> Image.Image: """Visualize parsing results on the image. diff --git a/omnimcp/omniparser/server.py b/omnimcp/omniparser/server.py index ba51852..7a86d17 100644 --- a/omnimcp/omniparser/server.py +++ b/omnimcp/omniparser/server.py @@ -1,8 +1,14 @@ -"""Deployment module for OmniParser on AWS EC2.""" +# omnimcp/omniparser/server.py +"""Deployment module for OmniParser on AWS EC2 with on-demand startup and auto-shutdown.""" + +import datetime import os import subprocess import time +import json +import io +import zipfile from botocore.exceptions import ClientError from loguru import logger @@ -12,6 +18,15 @@ from omnimcp.config import config +# Update default configuration values +# config.AWS_EC2_INSTANCE_TYPE = "g4dn.xlarge" # 4 vCPU, 16GB RAM, T4 GPU +# config.INACTIVITY_TIMEOUT_MINUTES = 20 # Auto-shutdown after 20min inactivity + +# Lambda function name for auto-shutdown +LAMBDA_FUNCTION_NAME = f"{config.PROJECT_NAME}-auto-shutdown" +# CloudWatch rule name for monitoring +CLOUDWATCH_RULE_NAME = f"{config.PROJECT_NAME}-inactivity-monitor" + CLEANUP_ON_FAILURE = False @@ -143,8 +158,8 @@ def deploy_ec2_instance( Returns: tuple[str | None, str | None]: Instance ID and public IP if successful """ - ec2 = boto3.resource("ec2") - ec2_client = boto3.client("ec2") + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) # Check for existing instances first instances = ec2.instances.filter( @@ -220,13 +235,15 @@ def deploy_ec2_instance( logger.error(f"Error managing key pair: {e}") return None, None - # Create new instance + # Create new instance with improved EBS configuration for gp3 ebs_config = { "DeviceName": "/dev/sda1", "Ebs": { "VolumeSize": disk_size, - "VolumeType": "gp3", + "VolumeType": "gp3", # Explicitly set to gp3 "DeleteOnTermination": True, + "Iops": 3000, # Default for gp3 + "Throughput": 125, # Default for gp3 }, } @@ -432,6 +449,162 @@ def execute_command(ssh_client: paramiko.SSHClient, command: str) -> None: logger.info(f"Successfully executed: {command}") +def create_auto_shutdown_infrastructure(instance_id: str) -> None: + """Create CloudWatch rule and Lambda function for auto-shutdown. + + Args: + instance_id: ID of the EC2 instance to monitor + """ + lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) + events_client = boto3.client("events", region_name=config.AWS_REGION) + iam_client = boto3.client("iam", region_name=config.AWS_REGION) + + # Create IAM role for Lambda function + try: + assume_role_policy = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + + response = iam_client.create_role( + RoleName=f"{config.PROJECT_NAME}-lambda-role", + AssumeRolePolicyDocument=json.dumps(assume_role_policy), + ) + + role_arn = response["Role"]["Arn"] + + # Attach EC2 and CloudWatch permissions + iam_client.attach_role_policy( + RoleName=f"{config.PROJECT_NAME}-lambda-role", + PolicyArn="arn:aws:iam::aws:policy/AmazonEC2FullAccess", + ) + + iam_client.attach_role_policy( + RoleName=f"{config.PROJECT_NAME}-lambda-role", + PolicyArn="arn:aws:iam::aws:policy/CloudWatchLogsFullAccess", + ) + + logger.info(f"Created IAM role for Lambda function: {role_arn}") + + # Wait for role to be available + time.sleep(10) + + except ClientError as e: + if e.response["Error"]["Code"] == "EntityAlreadyExists": + logger.info("IAM role already exists, retrieving ARN...") + response = iam_client.get_role( + RoleName=f"{config.PROJECT_NAME}-lambda-role" + ) + role_arn = response["Role"]["Arn"] + else: + logger.error(f"Error creating IAM role: {e}") + return + + # Create Lambda function for auto-shutdown + lambda_code = f""" +import boto3 +import datetime +import json + +def lambda_handler(event, context): + ec2 = boto3.client('ec2', region_name='{config.AWS_REGION}') + instance_id = '{instance_id}' + + # Check if the instance is running + response = ec2.describe_instances(InstanceIds=[instance_id]) + state = response['Reservations'][0]['Instances'][0]['State']['Name'] + + if state == 'running': + print(f"Stopping instance {{instance_id}} due to inactivity") + ec2.stop_instances(InstanceIds=[instance_id]) + return {{ + 'statusCode': 200, + 'body': json.dumps('Instance stopped due to inactivity') + }} + else: + print(f"Instance {{instance_id}} is not running (state: {{state}})") + return {{ + 'statusCode': 200, + 'body': json.dumps(f'Instance is in {{state}} state, no action taken') + }} + """ + + try: + # Delete existing function if it exists + try: + lambda_client.delete_function(FunctionName=LAMBDA_FUNCTION_NAME) + logger.info(f"Deleted existing Lambda function: {LAMBDA_FUNCTION_NAME}") + except ClientError: + pass # Function doesn't exist, which is fine + + # Create new function + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, "a") as zip_file: + zip_file.writestr("lambda_function.py", lambda_code) + + response = lambda_client.create_function( + FunctionName=LAMBDA_FUNCTION_NAME, + Runtime="python3.9", + Role=role_arn, + Handler="lambda_function.lambda_handler", + Code={"ZipFile": zip_buffer.getvalue()}, + Timeout=30, + MemorySize=128, + Description=f"Auto-shutdown function for {config.PROJECT_NAME} instance", + ) + + lambda_arn = response["FunctionArn"] + logger.info(f"Created Lambda function: {lambda_arn}") + + # Create CloudWatch rule to trigger Lambda after inactivity + try: + events_client.delete_rule(Name=CLOUDWATCH_RULE_NAME) + logger.info(f"Deleted existing CloudWatch rule: {CLOUDWATCH_RULE_NAME}") + except ClientError: + pass # Rule doesn't exist, which is fine + + response = events_client.put_rule( + Name=CLOUDWATCH_RULE_NAME, + ScheduleExpression=f"rate({config.INACTIVITY_TIMEOUT_MINUTES} minutes)", + State="ENABLED", + Description=f"Monitors {config.PROJECT_NAME} instance for inactivity", + ) + + rule_arn = response["RuleArn"] + logger.info(f"Created CloudWatch rule: {rule_arn}") + + # Add lambda permission to be invoked by CloudWatch + try: + lambda_client.add_permission( + FunctionName=LAMBDA_FUNCTION_NAME, + StatementId=f"{config.PROJECT_NAME}-cloudwatch-trigger", + Action="lambda:InvokeFunction", + Principal="events.amazonaws.com", + SourceArn=rule_arn, + ) + except ClientError as e: + if e.response["Error"]["Code"] != "ResourceConflictException": + raise + + # Connect the rule to the Lambda function + events_client.put_targets( + Rule=CLOUDWATCH_RULE_NAME, Targets=[{"Id": "1", "Arn": lambda_arn}] + ) + + logger.info( + f"Auto-shutdown infrastructure created successfully for {instance_id=}" + ) + + except Exception as e: + logger.error(f"Error creating auto-shutdown infrastructure: {e}") + + class Deploy: """Class handling deployment operations for OmniParser.""" @@ -440,7 +613,12 @@ def start() -> None: """Start a new deployment of OmniParser on EC2.""" try: instance_id, instance_ip = configure_ec2_instance() - assert instance_ip, f"invalid {instance_ip=}" + if not instance_id or not instance_ip: + logger.error("Failed to deploy or configure EC2 instance") + return + + # Set up auto-shutdown infrastructure + create_auto_shutdown_infrastructure(instance_id) # Trigger driver installation via login shell Deploy.ssh(non_interactive=True) @@ -558,6 +736,10 @@ def start() -> None: server_url = f"http://{instance_ip}:{config.PORT}" logger.info(f"Deployment complete. Server running at: {server_url}") + logger.info( + f"Auto-shutdown after {config.INACTIVITY_TIMEOUT_MINUTES} minutes " + "of inactivity" + ) # Verify server is accessible from outside try: @@ -601,7 +783,7 @@ def start() -> None: @staticmethod def status() -> None: """Check the status of deployed instances.""" - ec2 = boto3.resource("ec2") + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) instances = ec2.instances.filter( Filters=[{"Name": "tag:Name", "Values": [config.PROJECT_NAME]}] ) @@ -620,15 +802,58 @@ def status() -> None: f"URL: Not available (no public IP)" ) + # Check auto-shutdown infrastructure + lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) + events_client = boto3.client("events", region_name=config.AWS_REGION) + + try: + lambda_response = lambda_client.get_function( + FunctionName=LAMBDA_FUNCTION_NAME + ) + logger.info(f"Auto-shutdown Lambda: {LAMBDA_FUNCTION_NAME} (Active)") + logger.debug(f"{lambda_response=}") + except ClientError: + logger.info("Auto-shutdown Lambda: Not configured") + + try: + rule_response = events_client.describe_rule(Name=CLOUDWATCH_RULE_NAME) + logger.info( + f"CloudWatch rule: {CLOUDWATCH_RULE_NAME} " + f"(State: {rule_response['State']})" + ) + logger.info( + f"Auto-shutdown interval: {config.INACTIVITY_TIMEOUT_MINUTES} minutes" + ) + except ClientError: + logger.info("CloudWatch rule: Not configured") + + # Check discovery API + try: + apigw_client = boto3.client("apigateway", region_name=config.AWS_REGION) + apis = apigw_client.get_rest_apis() + + api_found = False + for api in apis["items"]: + if api["name"] == API_GATEWAY_NAME: + api_id = api["id"] + api_endpoint = ( + f"https://{api_id}.execute-api" + f".{config.AWS_REGION}.amazonaws.com/prod/discover" + ) + logger.info(f"Discovery API: {api_endpoint}") + api_found = True + break + + if not api_found: + logger.info("Discovery API: Not configured") + + except ClientError as e: + logger.error(f"Error checking API Gateway: {e}") + @staticmethod def ssh(non_interactive: bool = False) -> None: - """SSH into the running instance. - - Args: - non_interactive: If True, run in non-interactive mode - """ # Get instance IP - ec2 = boto3.resource("ec2") + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) instances = ec2.instances.filter( Filters=[ {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, @@ -652,42 +877,76 @@ def ssh(non_interactive: bool = False) -> None: return if non_interactive: - # Simulate full login by forcing all initialization scripts + # Trigger driver installation (this might cause reboot) ssh_command = [ "ssh", "-o", - "StrictHostKeyChecking=no", # Automatically accept new host keys + "StrictHostKeyChecking=no", "-o", - "UserKnownHostsFile=/dev/null", # Prevent writing to known_hosts + "UserKnownHostsFile=/dev/null", "-i", config.AWS_EC2_KEY_PATH, f"{config.AWS_EC2_USER}@{ip}", - "-t", # Allocate a pseudo-terminal - "-tt", # Force pseudo-terminal allocation - "bash --login -c 'exit'", # Force full login shell and exit immediately + "-t", + "-tt", + "bash --login -c 'exit'", ] - else: - # Build and execute SSH command - ssh_command = ( - f"ssh -i {config.AWS_EC2_KEY_PATH} -o StrictHostKeyChecking=no " - f"{config.AWS_EC2_USER}@{ip}" + + try: + subprocess.run(ssh_command, check=True) + logger.info("Initial SSH login completed successfully") + except subprocess.CalledProcessError as e: + logger.warning(f"Initial SSH connection closed: {e}") + + # Wait for potential reboot to complete + logger.info( + "Waiting for instance to be fully available after potential reboot..." ) + max_attempts = 20 + attempt = 0 + while attempt < max_attempts: + attempt += 1 + logger.info(f"SSH connection attempt {attempt}/{max_attempts}") + try: + # Check if we can make a new SSH connection + test_ssh_cmd = [ + "ssh", + "-o", + "StrictHostKeyChecking=no", + "-o", + "ConnectTimeout=5", + "-o", + "UserKnownHostsFile=/dev/null", + "-i", + config.AWS_EC2_KEY_PATH, + f"{config.AWS_EC2_USER}@{ip}", + "echo 'SSH connection successful'", + ] + result = subprocess.run( + test_ssh_cmd, capture_output=True, text=True + ) + if result.returncode == 0: + logger.info("Instance is ready for SSH connections") + return + except Exception as e: + pass + + time.sleep(10) # Wait 10 seconds between attempts + + logger.error("Failed to reconnect to instance after potential reboot") + else: + # Interactive SSH session + ssh_command = f"ssh -i {config.AWS_EC2_KEY_PATH} -o StrictHostKeyChecking=no {config.AWS_EC2_USER}@{ip}" logger.info(f"Connecting with: {ssh_command}") os.system(ssh_command) return - # Execute the SSH command for non-interactive mode - try: - subprocess.run(ssh_command, check=True) - except subprocess.CalledProcessError as e: - logger.error(f"SSH connection failed: {e}") - @staticmethod def stop( project_name: str = config.PROJECT_NAME, security_group_name: str = config.AWS_EC2_SECURITY_GROUP, ) -> None: - """Terminates the EC2 instance and deletes the associated security group. + """Terminates EC2 instance and deletes associated security group and resources. Args: project_name (str): The project name used to tag the instance. @@ -695,8 +954,11 @@ def stop( security_group_name (str): The name of the security group to delete. Defaults to config.AWS_EC2_SECURITY_GROUP. """ - ec2_resource = boto3.resource("ec2") - ec2_client = boto3.client("ec2") + ec2_resource = boto3.resource("ec2", region_name=config.AWS_REGION) + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + lambda_client = boto3.client("lambda", region_name=config.AWS_REGION) + events_client = boto3.client("events", region_name=config.AWS_REGION) + iam_client = boto3.client("iam", region_name=config.AWS_REGION) # Terminate EC2 instances instances = ec2_resource.instances.filter( @@ -721,6 +983,50 @@ def stop( instance.wait_until_terminated() logger.info(f"Instance {instance.id} terminated successfully.") + # Delete CloudWatch rule and Lambda function + try: + events_client.remove_targets(Rule=CLOUDWATCH_RULE_NAME, Ids=["1"]) + events_client.delete_rule(Name=CLOUDWATCH_RULE_NAME) + logger.info(f"Deleted CloudWatch rule: {CLOUDWATCH_RULE_NAME}") + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + logger.info(f"CloudWatch rule {CLOUDWATCH_RULE_NAME} does not exist") + else: + logger.error(f"Error deleting CloudWatch rule: {e}") + + try: + lambda_client.delete_function(FunctionName=LAMBDA_FUNCTION_NAME) + logger.info(f"Deleted Lambda function: {LAMBDA_FUNCTION_NAME}") + except ClientError as e: + if e.response["Error"]["Code"] == "ResourceNotFoundException": + logger.info(f"Lambda function {LAMBDA_FUNCTION_NAME} does not exist") + else: + logger.error(f"Error deleting Lambda function: {e}") + + # Delete IAM roles + for role_name in [ + f"{config.PROJECT_NAME}-lambda-role", + f"{config.PROJECT_NAME}-discovery-role", + ]: + try: + # Detach policies first + attached_policies = iam_client.list_attached_role_policies( + RoleName=role_name + ) + for policy in attached_policies.get("AttachedPolicies", []): + iam_client.detach_role_policy( + RoleName=role_name, PolicyArn=policy["PolicyArn"] + ) + logger.info( + f"Detached policy {policy['PolicyArn']} from role {role_name}" + ) + + iam_client.delete_role(RoleName=role_name) + logger.info(f"Deleted IAM role: {role_name}") + except ClientError as e: + if e.response["Error"]["Code"] != "NoSuchEntity": + logger.error(f"Error deleting IAM role {role_name}: {e}") + # Delete security group try: ec2_client.delete_security_group(GroupName=security_group_name) @@ -734,6 +1040,210 @@ def stop( else: logger.error(f"Error deleting security group: {e}") + logger.info("Cleanup completed successfully.") + + @staticmethod + def stop_instance(instance_id: str) -> None: + """Stop a specific EC2 instance.""" + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + try: + ec2_client.stop_instances(InstanceIds=[instance_id]) + logger.info(f"Stopped instance {instance_id}") + except ClientError as e: + logger.error(f"Error stopping instance {instance_id}: {e}") + + @staticmethod + def start_instance(instance_id: str) -> str: + """Start a specific EC2 instance and return its public IP.""" + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + ec2_resource = boto3.resource("ec2", region_name=config.AWS_REGION) + + try: + ec2_client.start_instances(InstanceIds=[instance_id]) + logger.info(f"Starting instance {instance_id}...") + + instance = ec2_resource.Instance(instance_id) + instance.wait_until_running() + instance.reload() + + logger.info( + f"Instance {instance_id} started, IP: {instance.public_ip_address}" + ) + return instance.public_ip_address + except ClientError as e: + logger.error(f"Error starting instance {instance_id}: {e}") + return None + + @staticmethod + def history(days: int = 7) -> None: + """Display deployment and auto-shutdown history. + + Args: + days: Number of days of history to retrieve (default: 7) + """ + logger.info(f"Retrieving {days} days of deployment history...") + + # Calculate time range + end_time = datetime.datetime.now() + start_time = end_time - datetime.timedelta(days=days) + + # Initialize AWS clients + cloudwatch_logs = boto3.client("logs", region_name=config.AWS_REGION) + ec2_client = boto3.client("ec2", region_name=config.AWS_REGION) + + # Get instance information + instances = [] + try: + response = ec2_client.describe_instances( + Filters=[{"Name": "tag:Name", "Values": [config.PROJECT_NAME]}] + ) + for reservation in response["Reservations"]: + instances.extend(reservation["Instances"]) + + logger.info( + f"Found {len(instances)} instances with name tag '{config.PROJECT_NAME}'" + ) + except Exception as e: + logger.error(f"Error retrieving instances: {e}") + + # Display instance state transition history + logger.info("\n=== Instance State History ===") + for instance in instances: + instance_id = instance["InstanceId"] + try: + # Get instance state transition history + response = ec2_client.describe_instance_status( + InstanceIds=[instance_id], IncludeAllInstances=True + ) + + state = instance["State"]["Name"] + launch_time = instance.get("LaunchTime", "Unknown") + + logger.info( + f"Instance {instance_id}: Current state={state}, Launch time={launch_time}" + ) + + # Get instance console output if available + try: + console = ec2_client.get_console_output(InstanceId=instance_id) + if "Output" in console and console["Output"]: + logger.info("Last console output (truncated):") + # Show last few lines of console output + lines = console["Output"].strip().split("\n") + for line in lines[-10:]: + logger.info(f" {line}") + except Exception as e: + logger.info(f"Console output not available: {e}") + + except Exception as e: + logger.error(f"Error retrieving status for instance {instance_id}: {e}") + + # Check for Lambda auto-shutdown logs + logger.info("\n=== Auto-shutdown Lambda Logs ===") + try: + # Check if log group exists + log_group_name = f"/aws/lambda/{LAMBDA_FUNCTION_NAME}" + + log_streams = cloudwatch_logs.describe_log_streams( + logGroupName=log_group_name, + orderBy="LastEventTime", + descending=True, + limit=5, + ) + + if not log_streams.get("logStreams"): + logger.info("No log streams found for auto-shutdown Lambda") + else: + # Process the most recent log streams + for stream in log_streams.get("logStreams", [])[:5]: + stream_name = stream["logStreamName"] + logger.info(f"Log stream: {stream_name}") + + logs = cloudwatch_logs.get_log_events( + logGroupName=log_group_name, + logStreamName=stream_name, + startTime=int(start_time.timestamp() * 1000), + endTime=int(end_time.timestamp() * 1000), + limit=100, + ) + + if not logs.get("events"): + logger.info(" No events in this stream") + continue + + for event in logs.get("events", []): + timestamp = datetime.datetime.fromtimestamp( + event["timestamp"] / 1000 + ) + message = event["message"] + logger.info(f" {timestamp}: {message}") + + except cloudwatch_logs.exceptions.ResourceNotFoundException: + logger.info( + "No logs found for auto-shutdown Lambda. It may not have been triggered yet." + ) + except Exception as e: + logger.error(f"Error retrieving Lambda logs: {e}") + + logger.info("\nHistory retrieval complete.") + + +@staticmethod +def discover() -> dict: + """Discover instances by tag and optionally start them if stopped. + + Returns: + dict: Information about the discovered instance including status and connection + details + """ + ec2 = boto3.resource("ec2", region_name=config.AWS_REGION) + + # Find instance with project tag + instances = list( + ec2.instances.filter( + Filters=[ + {"Name": "tag:Name", "Values": [config.PROJECT_NAME]}, + { + "Name": "instance-state-name", + "Values": ["pending", "running", "stopped"], + }, + ] + ) + ) + + if not instances: + logger.info("No instances found") + return {"status": "not_found"} + + instance = instances[0] # Get the first matching instance + logger.info(f"Found instance {instance.id} in state {instance.state['Name']}") + + # If instance is stopped, start it + if instance.state["Name"] == "stopped": + logger.info(f"Starting stopped instance {instance.id}") + instance.start() + return { + "instance_id": instance.id, + "status": "starting", + "message": "Instance is starting. Please try again in a few minutes.", + } + + # Return info for running instance + if instance.state["Name"] == "running": + return { + "instance_id": instance.id, + "public_ip": instance.public_ip_address, + "status": instance.state["Name"], + "api_url": f"http://{instance.public_ip_address}:{config.PORT}", + } + + # Instance is in another state (e.g., pending) + return { + "instance_id": instance.id, + "status": instance.state["Name"], + "message": f"Instance is {instance.state['Name']}. Please try again shortly.", + } + if __name__ == "__main__": fire.Fire(Deploy)