""" Functions for init of the AWS environment for a new MiroSite """ import logging import json import time import boto3 def create_bucket(bucket_name, region_name, aws_access_key, aws_secret_key): """ create_bucket :param bucket_name: The name of the bucket to work with :param region_name: The region the bucket belongs :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return Boolean for success """ # Create client connection s3 = boto3.client('s3', region_name=region_name, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) # Create the bucket try: s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint':region_name }) return True except (s3.exceptions.BucketAlreadyExists, s3.exceptions.BucketAlreadyOwnedByYou): return True except Exception as ex: logging.error("[CREATE S3 BUCKET] [%s] [%s]", bucket_name, str(ex)) return False def configure_website(bucket_name, aws_access_key, aws_secret_key): """ configure_website :param bucket_name: The name of the bucket to work with :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return Boolean for success """ # Create client connection s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) try: # Enable website hosting s3.put_bucket_website( Bucket=bucket_name, WebsiteConfiguration={ 'IndexDocument': {'Suffix': 'index.html'}, 'ErrorDocument': {'Key': 'index.html'}, } ) return True except Exception as ex: logging.error("[WEBSITE ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex)) return False def grant_public_access(bucket_name, aws_access_key, aws_secret_key): """ grant_public_access :param bucket_name: The name of the bucket to work with :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return Boolean for success """ # Create client connection s3 = boto3.client('s3', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) try: # Update policy s3.put_public_access_block(Bucket=bucket_name, PublicAccessBlockConfiguration={ 'BlockPublicAcls': False, 'IgnorePublicAcls': False, 'BlockPublicPolicy': False, 'RestrictPublicBuckets': False }) # Set public access policy public_access_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": "*", "Action": "s3:GetObject", "Resource": f"arn:aws:s3:::{bucket_name}/*" } ] } # Update policy s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(public_access_policy)) return True except Exception as ex: logging.error("[PUBLIC POLICY ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex)) return False def get_web_url(bucket_name, region_name): """ get_web_url :param bucket_name: The name of the bucket to work with :param region_name: The region the bucket belongs :return String of a URL """ return f"{bucket_name}.s3-website-{region_name}.amazonaws.com" def get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key): """ get_cloudfront_distribution :param bucket_name: The name of the bucket to work with :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return New Distribution ID or None if failed """ # Create client connection cloudfront = boto3.client('cloudfront', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) paginator = cloudfront.get_paginator('list_distributions') for page in paginator.paginate(): # Check if there are distributions in the current page if 'DistributionList' in page and 'Items' in page['DistributionList']: for distribution in page['DistributionList']['Items']: if distribution.get('Comment') == bucket_name[:127]: return distribution return None def create_cloudfront_distribution(bucket_name, region_name, logging_bucket, aws_access_key, aws_secret_key): """ create_cloudfront_distribution :param bucket_name: The name of the bucket to work with :param region_name: The region the bucket belongs :param logging_bucket: Bucket to send logs to :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return New Distribution ID or None if failed """ # Create client connection cloudfront = boto3.client('cloudfront', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) # Create CloudFront distribution distribution_config = { 'CallerReference': bucket_name, 'Origins': { 'Quantity': 1, 'Items': [ { 'Id': get_web_url(bucket_name, region_name), 'DomainName': get_web_url(bucket_name, region_name), 'OriginPath': '', 'CustomOriginConfig': { 'HTTPPort': 80, 'HTTPSPort': 443, 'OriginProtocolPolicy': 'http-only' } } ] }, 'DefaultRootObject': 'index.html', 'DefaultCacheBehavior': { 'TargetOriginId': get_web_url(bucket_name, region_name), 'ViewerProtocolPolicy': 'redirect-to-https', 'CachePolicyId': '4135ea2d-6df8-44a3-9df3-4b5a84be39ad' }, 'Logging': { 'Enabled': True, 'IncludeCookies': False, 'Bucket': logging_bucket, 'Prefix': bucket_name, }, 'Comment': bucket_name[:127], 'Enabled': True } try: response = cloudfront.create_distribution(DistributionConfig=distribution_config) return response['Distribution'] except cloudfront.exceptions.DistributionAlreadyExists as ee: logging.error("[DISTRIBUTIONALREADYEXISTS] [%s] [%s]", bucket_name, str(ee)) return get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key) return None def create_certificate(domain_name, region_name, aws_access_key, aws_secret_key): """ create_certificate :param domain_name: The name of the domain to create cert for :param region_name: The region the bucket belongs :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return New Certificate or None if failed """ # Create client connection acm = boto3.client('acm', region_name=region_name, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) # Request a new ACM certificate response = acm.request_certificate( DomainName=domain_name, ValidationMethod='DNS', ) # Wait a few seconds. New AWS certificates take a little to create. time.sleep(10) # Get the certificate ARN return response['CertificateArn'] def get_certificate(certificate_arn, region_name, aws_access_key, aws_secret_key): """ get_certificate :param certificate_arn: The certificate ARN :param region_name: The region the bucket belongs :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return New Certificate or None if failed """ # Create client connection acm = boto3.client('acm', region_name=region_name, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) try: # Get certificate new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn) return new_certificate_req['Certificate'] except Exception as ex: logging.error("[CERTIFICATE GET] [%s] [%s]", certificate_arn, str(ex)) return None def get_certificate_dns(certificate_arn, domain_name, region_name, aws_access_key, aws_secret_key): """ get_certificate_dns :param certificate_arn: The certificate ARN :param domain_name: The name of the domain to create cert for :param region_name: The region the bucket belongs :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return New Certificate DNS """ # Create client connection acm = boto3.client('acm', region_name=region_name, aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) # Get certificate new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn) new_certificate = new_certificate_req['Certificate'] # Get the DNS information certificate_dns = [] if new_certificate: for dvo in new_certificate.get('DomainValidationOptions', []): if not dvo.get('DomainName') == domain_name: continue # Get the data needed for DNS update and save to list resource_record_type = dvo.get('ResourceRecord', {}).get('Type', '') resource_record_name = dvo.get('ResourceRecord', {}).get('Name', '') resource_record_value = dvo.get('ResourceRecord', {}).get('Value', '') certificate_dns.append({ "type": resource_record_type, "name": resource_record_name, "value": resource_record_value }) # Return results return certificate_dns def update_cloudfront_distribution(distribution_id, aliases, acm_certificate_arn, aws_access_key, aws_secret_key): """ update_cloudfront_distribution :param distribution_id: :param aliases: :param acm_certificate_arn: :param aws_access_key: Authentication access key :param aws_secret_key: Authentication secret key :return Boolean for success """ # Create client connection cloudfront = boto3.client('cloudfront', aws_access_key_id=aws_access_key, aws_secret_access_key=aws_secret_key) try: # Get Distribution and update values distribution_config = cloudfront.get_distribution_config(Id=distribution_id) distribution_config['DistributionConfig']['Aliases']['Items'] = aliases distribution_config['DistributionConfig']['Aliases']['Quantity'] = len(aliases) # Update ViewerCertificate new_viewercertificate = distribution_config['DistributionConfig']['ViewerCertificate'] new_viewercertificate['CloudFrontDefaultCertificate'] = False new_viewercertificate['CertificateSource'] = "acm" new_viewercertificate['ACMCertificateArn'] = acm_certificate_arn new_viewercertificate['SSLSupportMethod'] = "sni-only" new_viewercertificate['MinimumProtocolVersion'] = "TLSv1.2_2021" distribution_config['DistributionConfig']['ViewerCertificate'] = new_viewercertificate # Update the Distribution cloudfront.update_distribution( DistributionConfig=distribution_config['DistributionConfig'], Id=distribution_id, IfMatch=distribution_config['ETag'] ) return True except Exception as ex: logging.error("[DISTRIBUTION UPDATE] [%s] [%s]", distribution_id, str(ex)) return False