lib-af-unify/lib_afc_unify/aws.py

380 lines
12 KiB
Python
Raw Permalink Normal View History

2024-03-24 18:03:12 +00:00
"""
Functions for init of the AWS environment for a new MiroSite
"""
import logging
import json
import time
import boto3
def create_bucket(bucket_name, region_name, aws_access_key, aws_secret_key):
"""
create_bucket
:param bucket_name: The name of the bucket to work with
:param region_name: The region the bucket belongs
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return Boolean for success
"""
# Create client connection
s3 = boto3.client('s3',
region_name=region_name,
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
# Create the bucket
try:
s3.create_bucket(Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint':region_name
})
return True
except (s3.exceptions.BucketAlreadyExists, s3.exceptions.BucketAlreadyOwnedByYou):
return True
except Exception as ex:
logging.error("[CREATE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
return False
def configure_website(bucket_name, aws_access_key, aws_secret_key):
"""
configure_website
:param bucket_name: The name of the bucket to work with
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return Boolean for success
"""
# Create client connection
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
try:
# Enable website hosting
s3.put_bucket_website(
Bucket=bucket_name,
WebsiteConfiguration={
'IndexDocument': {'Suffix': 'index.html'},
'ErrorDocument': {'Key': 'index.html'},
}
)
return True
except Exception as ex:
logging.error("[WEBSITE ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
return False
def grant_public_access(bucket_name, aws_access_key, aws_secret_key):
"""
grant_public_access
:param bucket_name: The name of the bucket to work with
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return Boolean for success
"""
# Create client connection
s3 = boto3.client('s3',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
try:
# Update policy
s3.put_public_access_block(Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': False,
'IgnorePublicAcls': False,
'BlockPublicPolicy': False,
'RestrictPublicBuckets': False
})
# Set public access policy
public_access_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Resource": f"arn:aws:s3:::{bucket_name}/*"
}
]
}
# Update policy
s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(public_access_policy))
return True
except Exception as ex:
logging.error("[PUBLIC POLICY ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
return False
def get_web_url(bucket_name, region_name):
"""
get_web_url
:param bucket_name: The name of the bucket to work with
:param region_name: The region the bucket belongs
:return String of a URL
"""
return f"{bucket_name}.s3-website-{region_name}.amazonaws.com"
def get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key):
"""
get_cloudfront_distribution
:param bucket_name: The name of the bucket to work with
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return New Distribution ID or None if failed
"""
# Create client connection
cloudfront = boto3.client('cloudfront',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
paginator = cloudfront.get_paginator('list_distributions')
for page in paginator.paginate():
# Check if there are distributions in the current page
if 'DistributionList' in page and 'Items' in page['DistributionList']:
for distribution in page['DistributionList']['Items']:
if distribution.get('Comment') == bucket_name[:127]:
return distribution
return None
def create_cloudfront_distribution(bucket_name, region_name, logging_bucket,
aws_access_key, aws_secret_key):
"""
create_cloudfront_distribution
:param bucket_name: The name of the bucket to work with
:param region_name: The region the bucket belongs
:param logging_bucket: Bucket to send logs to
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return New Distribution ID or None if failed
"""
# Create client connection
cloudfront = boto3.client('cloudfront',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
# Create CloudFront distribution
distribution_config = {
'CallerReference': bucket_name,
'Origins': {
'Quantity': 1,
'Items': [
{
'Id': get_web_url(bucket_name, region_name),
'DomainName': get_web_url(bucket_name, region_name),
'OriginPath': '',
'CustomOriginConfig': {
'HTTPPort': 80,
'HTTPSPort': 443,
'OriginProtocolPolicy': 'http-only'
}
}
]
},
'DefaultRootObject': 'index.html',
'DefaultCacheBehavior': {
'TargetOriginId': get_web_url(bucket_name, region_name),
'ViewerProtocolPolicy': 'redirect-to-https',
'CachePolicyId': '4135ea2d-6df8-44a3-9df3-4b5a84be39ad'
},
'Logging': {
'Enabled': True,
'IncludeCookies': False,
'Bucket': logging_bucket,
'Prefix': bucket_name,
},
'Comment': bucket_name[:127],
'Enabled': True
}
try:
response = cloudfront.create_distribution(DistributionConfig=distribution_config)
return response['Distribution']
except cloudfront.exceptions.DistributionAlreadyExists as ee:
logging.error("[DISTRIBUTIONALREADYEXISTS] [%s] [%s]", bucket_name, str(ee))
return get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key)
return None
def create_certificate(domain_name, region_name, aws_access_key, aws_secret_key):
"""
create_certificate
:param domain_name: The name of the domain to create cert for
:param region_name: The region the bucket belongs
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return New Certificate or None if failed
"""
# Create client connection
acm = boto3.client('acm',
region_name=region_name,
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
# Request a new ACM certificate
response = acm.request_certificate(
DomainName=domain_name,
ValidationMethod='DNS',
)
# Wait a few seconds. New AWS certificates take a little to create.
time.sleep(10)
# Get the certificate ARN
return response['CertificateArn']
def get_certificate(certificate_arn, region_name, aws_access_key, aws_secret_key):
"""
get_certificate
:param certificate_arn: The certificate ARN
:param region_name: The region the bucket belongs
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return New Certificate or None if failed
"""
# Create client connection
acm = boto3.client('acm',
region_name=region_name,
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
try:
# Get certificate
new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn)
return new_certificate_req['Certificate']
except Exception as ex:
logging.error("[CERTIFICATE GET] [%s] [%s]", certificate_arn, str(ex))
return None
def get_certificate_dns(certificate_arn, domain_name, region_name, aws_access_key, aws_secret_key):
"""
get_certificate_dns
:param certificate_arn: The certificate ARN
:param domain_name: The name of the domain to create cert for
:param region_name: The region the bucket belongs
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return New Certificate DNS
"""
# Create client connection
acm = boto3.client('acm',
region_name=region_name,
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
# Get certificate
new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn)
new_certificate = new_certificate_req['Certificate']
# Get the DNS information
certificate_dns = []
if new_certificate:
for dvo in new_certificate.get('DomainValidationOptions', []):
if not dvo.get('DomainName') == domain_name:
continue
# Get the data needed for DNS update and save to list
resource_record_type = dvo.get('ResourceRecord', {}).get('Type', '')
resource_record_name = dvo.get('ResourceRecord', {}).get('Name', '')
resource_record_value = dvo.get('ResourceRecord', {}).get('Value', '')
certificate_dns.append({
"type": resource_record_type,
"name": resource_record_name,
"value": resource_record_value
})
# Return results
return certificate_dns
def update_cloudfront_distribution(distribution_id, aliases, acm_certificate_arn,
aws_access_key, aws_secret_key):
"""
update_cloudfront_distribution
:param distribution_id:
:param aliases:
:param acm_certificate_arn:
:param aws_access_key: Authentication access key
:param aws_secret_key: Authentication secret key
:return Boolean for success
"""
# Create client connection
cloudfront = boto3.client('cloudfront',
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key)
try:
# Get Distribution and update values
distribution_config = cloudfront.get_distribution_config(Id=distribution_id)
distribution_config['DistributionConfig']['Aliases']['Items'] = aliases
distribution_config['DistributionConfig']['Aliases']['Quantity'] = len(aliases)
# Update ViewerCertificate
new_viewercertificate = distribution_config['DistributionConfig']['ViewerCertificate']
new_viewercertificate['CloudFrontDefaultCertificate'] = False
new_viewercertificate['CertificateSource'] = "acm"
new_viewercertificate['ACMCertificateArn'] = acm_certificate_arn
new_viewercertificate['SSLSupportMethod'] = "sni-only"
new_viewercertificate['MinimumProtocolVersion'] = "TLSv1.2_2021"
distribution_config['DistributionConfig']['ViewerCertificate'] = new_viewercertificate
# Update the Distribution
cloudfront.update_distribution(
DistributionConfig=distribution_config['DistributionConfig'],
Id=distribution_id,
IfMatch=distribution_config['ETag']
)
return True
except Exception as ex:
logging.error("[DISTRIBUTION UPDATE] [%s] [%s]", distribution_id, str(ex))
return False