add unify functions
This commit is contained in:
parent
1186d0baba
commit
e752ee725c
|
|
@ -5,4 +5,3 @@ lib_afc_unify
|
||||||
A Python library for working with AFC Unify
|
A Python library for working with AFC Unify
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,379 @@
|
||||||
|
"""
|
||||||
|
Functions for init of the AWS environment for a new MiroSite
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
|
||||||
|
|
||||||
|
def create_bucket(bucket_name, region_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
create_bucket
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return Boolean for success
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
s3 = boto3.client('s3',
|
||||||
|
region_name=region_name,
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
# Create the bucket
|
||||||
|
try:
|
||||||
|
s3.create_bucket(Bucket=bucket_name,
|
||||||
|
CreateBucketConfiguration={
|
||||||
|
'LocationConstraint':region_name
|
||||||
|
})
|
||||||
|
return True
|
||||||
|
except (s3.exceptions.BucketAlreadyExists, s3.exceptions.BucketAlreadyOwnedByYou):
|
||||||
|
return True
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[CREATE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def configure_website(bucket_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
configure_website
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return Boolean for success
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
s3 = boto3.client('s3',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Enable website hosting
|
||||||
|
s3.put_bucket_website(
|
||||||
|
Bucket=bucket_name,
|
||||||
|
WebsiteConfiguration={
|
||||||
|
'IndexDocument': {'Suffix': 'index.html'},
|
||||||
|
'ErrorDocument': {'Key': 'index.html'},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[WEBSITE ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def grant_public_access(bucket_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
grant_public_access
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return Boolean for success
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
s3 = boto3.client('s3',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Update policy
|
||||||
|
s3.put_public_access_block(Bucket=bucket_name,
|
||||||
|
PublicAccessBlockConfiguration={
|
||||||
|
'BlockPublicAcls': False,
|
||||||
|
'IgnorePublicAcls': False,
|
||||||
|
'BlockPublicPolicy': False,
|
||||||
|
'RestrictPublicBuckets': False
|
||||||
|
})
|
||||||
|
|
||||||
|
# Set public access policy
|
||||||
|
public_access_policy = {
|
||||||
|
"Version": "2012-10-17",
|
||||||
|
"Statement": [
|
||||||
|
{
|
||||||
|
"Effect": "Allow",
|
||||||
|
"Principal": "*",
|
||||||
|
"Action": "s3:GetObject",
|
||||||
|
"Resource": f"arn:aws:s3:::{bucket_name}/*"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
# Update policy
|
||||||
|
s3.put_bucket_policy(Bucket=bucket_name, Policy=json.dumps(public_access_policy))
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[PUBLIC POLICY ENABLE S3 BUCKET] [%s] [%s]", bucket_name, str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def get_web_url(bucket_name, region_name):
|
||||||
|
"""
|
||||||
|
get_web_url
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
|
||||||
|
:return String of a URL
|
||||||
|
"""
|
||||||
|
return f"{bucket_name}.s3-website-{region_name}.amazonaws.com"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
get_cloudfront_distribution
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return New Distribution ID or None if failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
cloudfront = boto3.client('cloudfront',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
paginator = cloudfront.get_paginator('list_distributions')
|
||||||
|
for page in paginator.paginate():
|
||||||
|
# Check if there are distributions in the current page
|
||||||
|
if 'DistributionList' in page and 'Items' in page['DistributionList']:
|
||||||
|
for distribution in page['DistributionList']['Items']:
|
||||||
|
if distribution.get('Comment') == bucket_name[:127]:
|
||||||
|
return distribution
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def create_cloudfront_distribution(bucket_name, region_name, logging_bucket,
|
||||||
|
aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
create_cloudfront_distribution
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
:param logging_bucket: Bucket to send logs to
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return New Distribution ID or None if failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
cloudfront = boto3.client('cloudfront',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
# Create CloudFront distribution
|
||||||
|
distribution_config = {
|
||||||
|
'CallerReference': bucket_name,
|
||||||
|
'Origins': {
|
||||||
|
'Quantity': 1,
|
||||||
|
'Items': [
|
||||||
|
{
|
||||||
|
'Id': get_web_url(bucket_name, region_name),
|
||||||
|
'DomainName': get_web_url(bucket_name, region_name),
|
||||||
|
'OriginPath': '',
|
||||||
|
'CustomOriginConfig': {
|
||||||
|
'HTTPPort': 80,
|
||||||
|
'HTTPSPort': 443,
|
||||||
|
'OriginProtocolPolicy': 'http-only'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
'DefaultRootObject': 'index.html',
|
||||||
|
'DefaultCacheBehavior': {
|
||||||
|
'TargetOriginId': get_web_url(bucket_name, region_name),
|
||||||
|
'ViewerProtocolPolicy': 'redirect-to-https',
|
||||||
|
'CachePolicyId': '4135ea2d-6df8-44a3-9df3-4b5a84be39ad'
|
||||||
|
},
|
||||||
|
'Logging': {
|
||||||
|
'Enabled': True,
|
||||||
|
'IncludeCookies': False,
|
||||||
|
'Bucket': logging_bucket,
|
||||||
|
'Prefix': bucket_name,
|
||||||
|
},
|
||||||
|
'Comment': bucket_name[:127],
|
||||||
|
'Enabled': True
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = cloudfront.create_distribution(DistributionConfig=distribution_config)
|
||||||
|
return response['Distribution']
|
||||||
|
except cloudfront.exceptions.DistributionAlreadyExists as ee:
|
||||||
|
logging.error("[DISTRIBUTIONALREADYEXISTS] [%s] [%s]", bucket_name, str(ee))
|
||||||
|
return get_cloudfront_distribution(bucket_name, aws_access_key, aws_secret_key)
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def create_certificate(domain_name, region_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
create_certificate
|
||||||
|
|
||||||
|
:param domain_name: The name of the domain to create cert for
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return New Certificate or None if failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
acm = boto3.client('acm',
|
||||||
|
region_name=region_name,
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
# Request a new ACM certificate
|
||||||
|
response = acm.request_certificate(
|
||||||
|
DomainName=domain_name,
|
||||||
|
ValidationMethod='DNS',
|
||||||
|
)
|
||||||
|
|
||||||
|
# Wait a few seconds. New AWS certificates take a little to create.
|
||||||
|
time.sleep(10)
|
||||||
|
|
||||||
|
# Get the certificate ARN
|
||||||
|
return response['CertificateArn']
|
||||||
|
|
||||||
|
|
||||||
|
def get_certificate(certificate_arn, region_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
get_certificate
|
||||||
|
|
||||||
|
:param certificate_arn: The certificate ARN
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return New Certificate or None if failed
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
acm = boto3.client('acm',
|
||||||
|
region_name=region_name,
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get certificate
|
||||||
|
new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn)
|
||||||
|
return new_certificate_req['Certificate']
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[CERTIFICATE GET] [%s] [%s]", certificate_arn, str(ex))
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_certificate_dns(certificate_arn, domain_name, region_name, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
get_certificate_dns
|
||||||
|
|
||||||
|
:param certificate_arn: The certificate ARN
|
||||||
|
:param domain_name: The name of the domain to create cert for
|
||||||
|
:param region_name: The region the bucket belongs
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return New Certificate DNS
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
acm = boto3.client('acm',
|
||||||
|
region_name=region_name,
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
# Get certificate
|
||||||
|
new_certificate_req = acm.describe_certificate(CertificateArn=certificate_arn)
|
||||||
|
new_certificate = new_certificate_req['Certificate']
|
||||||
|
|
||||||
|
# Get the DNS information
|
||||||
|
certificate_dns = []
|
||||||
|
if new_certificate:
|
||||||
|
for dvo in new_certificate.get('DomainValidationOptions', []):
|
||||||
|
if not dvo.get('DomainName') == domain_name:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get the data needed for DNS update and save to list
|
||||||
|
resource_record_type = dvo.get('ResourceRecord', {}).get('Type', '')
|
||||||
|
resource_record_name = dvo.get('ResourceRecord', {}).get('Name', '')
|
||||||
|
resource_record_value = dvo.get('ResourceRecord', {}).get('Value', '')
|
||||||
|
|
||||||
|
certificate_dns.append({
|
||||||
|
"type": resource_record_type,
|
||||||
|
"name": resource_record_name,
|
||||||
|
"value": resource_record_value
|
||||||
|
})
|
||||||
|
|
||||||
|
# Return results
|
||||||
|
return certificate_dns
|
||||||
|
|
||||||
|
|
||||||
|
def update_cloudfront_distribution(distribution_id, aliases, acm_certificate_arn,
|
||||||
|
aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
update_cloudfront_distribution
|
||||||
|
|
||||||
|
:param distribution_id:
|
||||||
|
:param aliases:
|
||||||
|
:param acm_certificate_arn:
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return Boolean for success
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
cloudfront = boto3.client('cloudfront',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get Distribution and update values
|
||||||
|
distribution_config = cloudfront.get_distribution_config(Id=distribution_id)
|
||||||
|
distribution_config['DistributionConfig']['Aliases']['Items'] = aliases
|
||||||
|
distribution_config['DistributionConfig']['Aliases']['Quantity'] = len(aliases)
|
||||||
|
|
||||||
|
# Update ViewerCertificate
|
||||||
|
new_viewercertificate = distribution_config['DistributionConfig']['ViewerCertificate']
|
||||||
|
new_viewercertificate['CloudFrontDefaultCertificate'] = False
|
||||||
|
new_viewercertificate['CertificateSource'] = "acm"
|
||||||
|
new_viewercertificate['ACMCertificateArn'] = acm_certificate_arn
|
||||||
|
new_viewercertificate['SSLSupportMethod'] = "sni-only"
|
||||||
|
new_viewercertificate['MinimumProtocolVersion'] = "TLSv1.2_2021"
|
||||||
|
distribution_config['DistributionConfig']['ViewerCertificate'] = new_viewercertificate
|
||||||
|
|
||||||
|
# Update the Distribution
|
||||||
|
cloudfront.update_distribution(
|
||||||
|
DistributionConfig=distribution_config['DistributionConfig'],
|
||||||
|
Id=distribution_id,
|
||||||
|
IfMatch=distribution_config['ETag']
|
||||||
|
)
|
||||||
|
|
||||||
|
return True
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[DISTRIBUTION UPDATE] [%s] [%s]", distribution_id, str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
@ -0,0 +1,140 @@
|
||||||
|
"""
|
||||||
|
Functions for MiroSite setup
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import tarfile
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
import mimetypes
|
||||||
|
|
||||||
|
import boto3
|
||||||
|
from jinja2 import Environment, FileSystemLoader
|
||||||
|
|
||||||
|
|
||||||
|
def upload_website(bucket_name, website_dir, aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
upload_website
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param website_dir: Folder of website files
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
s3 = boto3.client('s3',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
# Loop all the files
|
||||||
|
for root, _, files in os.walk(website_dir):
|
||||||
|
for file in files:
|
||||||
|
try:
|
||||||
|
local_path = os.path.join(root, file)
|
||||||
|
s3_path = os.path.relpath(local_path, website_dir)
|
||||||
|
|
||||||
|
# Determine the Content-Type based on file extension
|
||||||
|
content_type, _ = mimetypes.guess_type(local_path)
|
||||||
|
content_type = content_type or 'application/octet-stream'
|
||||||
|
|
||||||
|
# Upload file to S3
|
||||||
|
s3.upload_file(local_path,
|
||||||
|
bucket_name,
|
||||||
|
s3_path,
|
||||||
|
ExtraArgs={'ContentType': content_type})
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[FILE UPLOAD] [%s] [%s] [%s]", bucket_name, file, str(ex))
|
||||||
|
|
||||||
|
|
||||||
|
def download_website_gzip(bucket_name, region_name, website_gzip_name,
|
||||||
|
aws_access_key, aws_secret_key):
|
||||||
|
"""
|
||||||
|
download_website_gzip
|
||||||
|
|
||||||
|
:param bucket_name: The name of the bucket to work with
|
||||||
|
:param region_name: The region of the bucket
|
||||||
|
:param website_gzip_name: Name of file to download
|
||||||
|
:param aws_access_key: Authentication access key
|
||||||
|
:param aws_secret_key: Authentication secret key
|
||||||
|
|
||||||
|
:return Boolean for success
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create client connection
|
||||||
|
s3 = boto3.client('s3',
|
||||||
|
endpoint_url=f'https://{region_name}.linodeobjects.com',
|
||||||
|
aws_access_key_id=aws_access_key,
|
||||||
|
aws_secret_access_key=aws_secret_key)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Download file from S3
|
||||||
|
s3.download_file(bucket_name,
|
||||||
|
website_gzip_name,
|
||||||
|
website_gzip_name)
|
||||||
|
return True
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[WEBSITE GZIP DOWNLOAD] [%s] [%s] [%s]", bucket_name,
|
||||||
|
website_gzip_name,
|
||||||
|
str(ex))
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def extract_website(website_gzip):
|
||||||
|
"""
|
||||||
|
extract_website
|
||||||
|
|
||||||
|
:param website_gzip:
|
||||||
|
|
||||||
|
:return Folder of extracted website
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Create a temporary directory
|
||||||
|
extract_path = tempfile.mkdtemp()
|
||||||
|
|
||||||
|
with tarfile.open(website_gzip, 'r:gz') as tar:
|
||||||
|
tar.extractall(extract_path)
|
||||||
|
|
||||||
|
return extract_path
|
||||||
|
|
||||||
|
|
||||||
|
def apply_template_website(website_dir, template_dict):
|
||||||
|
"""
|
||||||
|
apply_template_website
|
||||||
|
|
||||||
|
:param website_dir: Folder of website files
|
||||||
|
:param template_dict: Template dict
|
||||||
|
|
||||||
|
:return
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Setup skip dirs
|
||||||
|
skip_dirs = [os.path.join(website_dir, "img")]
|
||||||
|
|
||||||
|
# Loop all the files
|
||||||
|
for root, _, files in os.walk(website_dir):
|
||||||
|
# Skip certain dirs
|
||||||
|
if root in skip_dirs:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Setup jinja2 loader
|
||||||
|
env = Environment(loader=FileSystemLoader(root))
|
||||||
|
|
||||||
|
# Loop all files in directory
|
||||||
|
for file in files:
|
||||||
|
try:
|
||||||
|
# Build file path
|
||||||
|
local_path = os.path.join(root, file)
|
||||||
|
|
||||||
|
# Get jinja2 template
|
||||||
|
template = env.get_template(file)
|
||||||
|
rendered_content = template.render(template_data=template_dict)
|
||||||
|
|
||||||
|
# Write the rendered file
|
||||||
|
with open(local_path, 'w', encoding='utf-8') as outfile:
|
||||||
|
outfile.write(rendered_content)
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error("[FILE JINJA] [%s] [%s]", file, str(ex))
|
||||||
5
setup.py
5
setup.py
|
|
@ -9,7 +9,7 @@ with open('README.md', 'r') as f:
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='lib_afc_unify',
|
name='lib_afc_unify',
|
||||||
version='0.1.0',
|
version='0.1.1',
|
||||||
author='',
|
author='',
|
||||||
author_email='',
|
author_email='',
|
||||||
description='',
|
description='',
|
||||||
|
|
@ -20,6 +20,7 @@ setup(
|
||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
python_requires='>=3.7',
|
python_requires='>=3.7',
|
||||||
install_requires=[
|
install_requires=[
|
||||||
|
'boto3==1.33.6',
|
||||||
|
'Jinja2==3.1.3',
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue