lib-af-s3storage/lib_afc_s3storage/s3storage.py

155 lines
3.8 KiB
Python
Executable File

"""
S3Storage Class object
"""
import os
import logging
import boto3
from lib_afc_s3storage.afs3exception import AFS3Error
class S3Storage:
"""
S3Storage class
"""
def __init__( self, access_key=None, secret_access_key=None, data_center="us-lax-1" ):
"""
__init__ Init S3 Storage class.
:param access_key: S3 Bucket access key
:param secret_access_key: S3 Bucket secret key
:param data_center: S3 Bucket data center. Default us-lax-1
"""
self.access_key = access_key
self.secret_access_key = secret_access_key
self.data_center = data_center
self.client = None
if self.access_key is not None and self.secret_access_key is not None:
self.connect()
def connect( self ):
"""
connect
"""
# generate an error if keys are empty
linode_obj_config = {
"aws_access_key_id": self.access_key,
"aws_secret_access_key": self.secret_access_key,
"endpoint_url": f"https://{self.data_center}.linodeobjects.com",
}
self.client = boto3.client("s3", **linode_obj_config)
def connect_data_center( self, data_center ):
"""
connect_data_center
"""
# "us-southeast-1" = Atlanta, GA
# "us-ord-1" = Chicago, IL
# "us-lax-1" = Los Angeles, CA
# "us-mia-1" = Miami, FL
# "us-east-1" = Newark, NJ
# "us-sea-1" = Seattle, WA
# "us-iad-1" = Washington, DC
self.data_center = data_center
if self.access_key is not None and self.secret_access_key is not None:
self.connect()
def list_buckets( self ):
"""
list_buckets
"""
if self.client is None:
return None
r = self.client.list_buckets()
logging.debug("response keys = %s", str(r.keys()))
return r['Buckets']
def list_objects( self, bucket=None ):
"""
list_objects
"""
if self.client is None or bucket is None:
return None
r = self.client.list_objects(Bucket=bucket)
return r['Contents']
def upload_file( self, filepathname=None, bucket=None, key=None ):
"""
upload_file
"""
if filepathname is None:
raise AFS3Error("S3Storage.upload_file(): missing required filepathname")
if not os.path.exists(filepathname):
raise AFS3Error(f"S3Storage.upload_file(): invalid filepathname '{filepathname}'")
if bucket is None:
raise AFS3Error("S3Storage.upload_file(): missing required bucket name")
if key is None:
raise AFS3Error("S3Storage.upload_file(): missing required key name")
self.client.upload_file(Filename=filepathname,
Bucket=bucket,
Key=key,
ExtraArgs={'ACL': 'private'})
def upload_folder( self ):
"""
upload_folder
"""
return
def get_object( self, bucket=None, filename=None ):
"""
get_object
"""
if bucket is None:
raise AFS3Error("S3Storage.get_object(): missing required bucket name")
if filename is None:
raise AFS3Error("S3Storage.get_object(): missing required filename")
this_object = self.client.get_object(Bucket=bucket, Key=filename)
return this_object['Body']
def delete_all_objects( self, bucket=None ):
"""
delete_all_objects
"""
if bucket == None:
raise AFS3Error("S3Storage.delete_all_objects(): missing required bucket name")
objects = self.client.list_objects(Bucket=bucket)
for obj in objects['Contents']:
self.client.delete_object(Bucket=bucket, Key=obj['Key'])
return