Skip to content
108 changes: 90 additions & 18 deletions src/lambda_codebase/account_processing/process_account_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import tempfile
import logging
from typing import Any, TypedDict
import yaml

from yaml.error import YAMLError
Expand All @@ -27,9 +28,25 @@
ACCOUNT_MANAGEMENT_STATEMACHINE = os.getenv(
"ACCOUNT_MANAGEMENT_STATEMACHINE_ARN",
)
ADF_VERSION = os.getenv("ADF_VERSION")
ADF_VERSION_METADATA_KEY = "adf_version"


def get_details_from_event(event: dict):
class S3ObjectLocation(TypedDict):
bucket_name: str
key: str


class AccountFileData(TypedDict):
"""
Class used to return YAML account file data and its related
metadata like the execution_id of the CodePipeline that uploaded it.
"""
content: Any
execution_id: str


def get_details_from_event(event: dict) -> S3ObjectLocation:
s3_details = event.get("Records", [{}])[0].get("s3")
if not s3_details:
raise ValueError("No S3 Event details present in event trigger")
Expand All @@ -41,33 +58,56 @@ def get_details_from_event(event: dict):
}


def get_file_from_s3(s3_object: dict, s3_resource: boto3.resource):
def get_file_from_s3(
s3_object_location: S3ObjectLocation,
s3_resource: boto3.resource,
) -> AccountFileData:
try:
LOGGER.debug(
"Reading YAML from S3: %s",
json.dumps(s3_object, indent=2) if LOGGER.isEnabledFor(logging.DEBUG) else "--data-hidden--"
"Reading YAML from S3: %s from %s",
s3_object_location.get('object_key'),
s3_object_location.get('bucket_name'),
)
s3_object = s3_resource.Object(**s3_object_location)
object_adf_version = s3_object.metadata.get(
ADF_VERSION_METADATA_KEY,
"n/a",
)
s3_object = s3_resource.Object(**s3_object)
if object_adf_version != ADF_VERSION:
LOGGER.info(
"Skipping S3 object: %s as it is generated with "
"an older ADF version ('adf_version' metadata = '%s')",
s3_object_location,
object_adf_version,
)
return {
"content": {},
"execution_id": ""
}

with tempfile.TemporaryFile(mode='w+b') as file_pointer:
s3_object.download_fileobj(file_pointer)

# Move pointer to the start of the file
file_pointer.seek(0)

return yaml.safe_load(file_pointer)
return {
"content": yaml.safe_load(file_pointer),
"execution_id": s3_object.metadata.get("execution_id"),
}
except ClientError as error:
LOGGER.error(
"Failed to download %s from %s, due to %s",
s3_object.get('object_key'),
s3_object.get('bucket_name'),
s3_object_location.get('object_key'),
s3_object_location.get('bucket_name'),
error,
)
raise
except YAMLError as yaml_error:
LOGGER.error(
"Failed to parse YAML file: %s from %s, due to %s",
s3_object.get('object_key'),
s3_object.get('bucket_name'),
s3_object_location.get('object_key'),
s3_object_location.get('bucket_name'),
yaml_error,
)
raise
Expand All @@ -89,7 +129,9 @@ def process_account(account_lookup, account):


def process_account_list(all_accounts, accounts_in_file):
account_lookup = {account["Name"]: account["Id"] for account in all_accounts}
account_lookup = {
account["Name"]: account["Id"] for account in all_accounts
}
processed_accounts = list(map(
lambda account: process_account(
account_lookup=account_lookup,
Expand All @@ -100,24 +142,48 @@ def process_account_list(all_accounts, accounts_in_file):
return processed_accounts


def start_executions(sfn_client, processed_account_list):
def start_executions(
sfn_client,
processed_account_list,
codepipeline_execution_id: str,
request_id: str,
):
if not codepipeline_execution_id:
codepipeline_execution_id = "no-codepipeline-exec-id-found"
short_request_id = request_id[-12:]
run_id = f"{codepipeline_execution_id}-{short_request_id}"
LOGGER.info(
"Invoking Account Management State Machine (%s)",
"Invoking Account Management State Machine (%s) -> %s",
ACCOUNT_MANAGEMENT_STATEMACHINE,
run_id,
)
for account in processed_account_list:
LOGGER.debug(f"Payload: {account}")
full_account_name = account.get('account_full_name', 'no-account-name')
# AWS Step Functions supports max 80 characters.
# Since the run_id equals 49 characters plus the dash, we have 30
# characters available. To ensure we don't run over, lets use a
# truncated version instead:
truncated_account_name = full_account_name[:30]
sfn_execution_name = f"{truncated_account_name}-{run_id}"

LOGGER.debug(
"Payload for %s: %s",
sfn_execution_name,
account,
)
sfn_client.start_execution(
stateMachineArn=ACCOUNT_MANAGEMENT_STATEMACHINE,
name=sfn_execution_name,
input=f"{json.dumps(account)}",
)


def lambda_handler(event, _):
def lambda_handler(event, context):
"""Main Lambda Entry point"""
LOGGER.debug(
"Processing event: %s",
json.dumps(event, indent=2) if LOGGER.isEnabledFor(logging.DEBUG) else "--data-hidden--"
json.dumps(event, indent=2) if LOGGER.isEnabledFor(logging.DEBUG)
else "--data-hidden--"
)
sfn_client = boto3.client("stepfunctions")
s3_resource = boto3.resource("s3")
Expand All @@ -127,8 +193,14 @@ def lambda_handler(event, _):

processed_account_list = process_account_list(
all_accounts=all_accounts,
accounts_in_file=account_file.get("accounts"),
accounts_in_file=account_file.get("content", {}).get("accounts", []),
)

start_executions(sfn_client, processed_account_list)
if processed_account_list:
start_executions(
sfn_client,
processed_account_list,
codepipeline_execution_id=account_file.get("execution_id"),
request_id=context.aws_request_id,
)
return event
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ Resources:
build:
commands:
- python adf-build/helpers/sync_to_s3.py --metadata adf_version=${!ADF_VERSION} --upload-with-metadata execution_id=${!CODEPIPELINE_EXECUTION_ID} deployment_map.yml s3://$ADF_PIPELINES_BUCKET/deployment_map.yml
- python adf-build/helpers/sync_to_s3.py --extension .yml --extension .yaml --metadata adf_version=${!ADF_VERSION} --upload-with-metadata execution_id=${!CODEBUILD_BUILD_NUMBER} --recursive deployment_maps s3://$ADF_PIPELINES_BUCKET/deployment_maps
- python adf-build/helpers/sync_to_s3.py --extension .yml --extension .yaml --metadata adf_version=${!ADF_VERSION} --upload-with-metadata execution_id=${!CODEPIPELINE_EXECUTION_ID} --recursive deployment_maps s3://$ADF_PIPELINES_BUCKET/deployment_maps
post_build:
commands:
- echo "Pipelines are updated in the AWS Step Functions ADFPipelineManagementStateMachine."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
DEPLOYMENT_ACCOUNT_REGION = os.environ["AWS_REGION"]
DEPLOYMENT_ACCOUNT_ID = os.environ["ACCOUNT_ID"]
PIPELINE_MANAGEMENT_STATEMACHINE = os.getenv("PIPELINE_MANAGEMENT_STATE_MACHINE")
ADF_VERSION = os.getenv("ADF_VERSION")
ADF_VERSION_METADATA_KEY = "adf_version"


_cache = None
Expand All @@ -42,6 +44,19 @@ def get_file_from_s3(s3_details: dict, s3_resource: boto3.resource):
s3_object = s3_resource.Object(
s3_details.get("bucket_name"), s3_details.get("object_key")
)
object_adf_version = s3_object.metadata.get(
ADF_VERSION_METADATA_KEY,
"n/a",
)
if object_adf_version != ADF_VERSION:
LOGGER.info(
"Skipping S3 object: %s as it is generated with "
"an older ADF version ('adf_version' metadata = '%s')",
s3_details,
object_adf_version,
)
return {}

with tempfile.TemporaryFile() as file_pointer:
s3_object.download_fileobj(file_pointer)

Expand Down Expand Up @@ -88,6 +103,7 @@ def lambda_handler(event, _):
sfn_client = boto3.client("stepfunctions")
s3_details = get_details_from_event(event)
deployment_map = get_file_from_s3(s3_details, s3_resource)
deployment_map["definition_bucket"] = s3_details.get("object_key")
start_executions(sfn_client, deployment_map)
if deployment_map:
deployment_map["definition_bucket"] = s3_details.get("object_key")
start_executions(sfn_client, deployment_map)
return output
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
"""

import os
import sys
import time
from math import floor
from datetime import datetime
from thread import PropagatingThread

import boto3
Expand All @@ -33,6 +37,24 @@
ADF_VERSION = os.environ["ADF_VERSION"]
ADF_LOG_LEVEL = os.environ["ADF_LOG_LEVEL"]
DEPLOYMENT_ACCOUNT_S3_BUCKET_NAME = os.environ["DEPLOYMENT_ACCOUNT_BUCKET"]
CODEPIPELINE_EXECUTION_ID = os.environ.get("CODEPIPELINE_EXECUTION_ID")
CODEBUILD_START_TIME_UNIXTS = floor(
int(
# Returns the unix timestamp in milliseconds
os.environ.get(
"CODEBUILD_START_TIME",
# Fall back to 10 minutes ago + convert Python timestamp from
# seconds to milliseconds:
floor(datetime.now().timestamp() - (10 * 60)) * 1000,
)
) / 1000.0 # Convert milliseconds to seconds
)
ACCOUNT_MANAGEMENT_STATE_MACHINE_ARN = os.environ.get(
"ACCOUNT_MANAGEMENT_STATE_MACHINE_ARN",
)
ACCOUNT_BOOTSTRAPPING_STATE_MACHINE_ARN = os.environ.get(
"ACCOUNT_BOOTSTRAPPING_STATE_MACHINE_ARN"
)
ADF_DEFAULT_SCM_FALLBACK_BRANCH = 'master'
LOGGER = configure_logger(__name__)

Expand Down Expand Up @@ -252,10 +274,119 @@ def worker_thread(
return


def await_sfn_executions(sfn_client):
_await_running_sfn_executions(
sfn_client,
ACCOUNT_MANAGEMENT_STATE_MACHINE_ARN,
filter_lambda=lambda item: (
item.get('name', '').find(CODEPIPELINE_EXECUTION_ID) > 0
),
status_filter='RUNNING',
)
_await_running_sfn_executions(
sfn_client,
ACCOUNT_BOOTSTRAPPING_STATE_MACHINE_ARN,
filter_lambda=None,
status_filter='RUNNING',
)
if _sfn_execution_exists_with(
sfn_client,
ACCOUNT_MANAGEMENT_STATE_MACHINE_ARN,
filter_lambda=lambda item: (
item.get('name', '').find(CODEPIPELINE_EXECUTION_ID) > 0
and item.get('status') in ['FAILED', 'TIMED_OUT', 'ABORTED']
),
status_filter=None,
):
LOGGER.error(
"Account Management State Machine encountered a failed, "
"timed out, or aborted execution. Please look into this problem "
"before retrying the bootstrap pipeline. You can navigate to: "
f"https://{REGION_DEFAULT}.console.aws.amazon.com/states/home?"
f"region={REGION_DEFAULT}#/statemachines/"
f"view/{ACCOUNT_MANAGEMENT_STATE_MACHINE_ARN}"
)
sys.exit(1)
if _sfn_execution_exists_with(
sfn_client,
ACCOUNT_BOOTSTRAPPING_STATE_MACHINE_ARN,
filter_lambda=lambda item: (
(
item.get('startDate', datetime.now()).timestamp()
>= CODEBUILD_START_TIME_UNIXTS
)
and item.get('status') in ['FAILED', 'TIMED_OUT', 'ABORTED']
),
status_filter=None,
):
LOGGER.error(
"Account Bootstrapping State Machine encountered a failed, "
"timed out, or aborted execution. Please look into this problem "
"before retrying the bootstrap pipeline. You can navigate to: "
"https://%(region)s.console.aws.amazon.com/states/home"
"?region=%(region)s#/statemachines/view/%(sfn_arn)s",
{
"region": REGION_DEFAULT,
"sfn_arn": ACCOUNT_BOOTSTRAPPING_STATE_MACHINE_ARN,
},
)
sys.exit(2)


def _await_running_sfn_executions(
sfn_client,
sfn_arn,
filter_lambda,
status_filter,
):
while _sfn_execution_exists_with(
sfn_client,
sfn_arn,
filter_lambda,
status_filter
):
LOGGER.info(
"Waiting for 30 seconds for the executions of %s to finish.",
sfn_arn,
)
time.sleep(30)


def _sfn_execution_exists_with(
sfn_client,
sfn_arn,
filter_lambda,
status_filter,
):
request_params = {
"stateMachineArn": sfn_arn,
}
if status_filter:
request_params["statusFilter"] = status_filter

paginator = sfn_client.get_paginator('list_executions')
for page in paginator.paginate(**request_params):
filtered = (
list(filter(filter_lambda, page["executions"])) if filter_lambda
else page["executions"]
)
if filtered:
LOGGER.info(
"Found %d state machine %s that are running.",
len(filtered),
"executions" if len(filtered) > 1 else "execution",
)
return True

return False


def main(): # pylint: disable=R0915
LOGGER.info("ADF Version %s", ADF_VERSION)
LOGGER.info("ADF Log Level is %s", ADF_LOG_LEVEL)

await_sfn_executions(boto3.client('stepfunctions'))

policies = OrganizationPolicy()
config = Config()
config.store_config()
Expand Down
Loading