291 lines
9.0 KiB
Python
Executable File
291 lines
9.0 KiB
Python
Executable File
"""
|
|
S3Storage Class object
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
|
|
import boto3
|
|
import csv
|
|
import smart_open
|
|
|
|
from lib_afc_s3storage.afs3exception import AFS3Error
|
|
|
|
|
|
class S3Storage:
|
|
"""
|
|
S3Storage class
|
|
"""
|
|
|
|
def __init__( self, access_key=None, secret_access_key=None, data_center="us-lax-1" ):
|
|
"""
|
|
__init__ Init S3 Storage class.
|
|
|
|
:param access_key: S3 Bucket access key
|
|
:param secret_access_key: S3 Bucket secret key
|
|
:param data_center: S3 Bucket data center. Default us-lax-1
|
|
"""
|
|
|
|
self.access_key = access_key
|
|
self.secret_access_key = secret_access_key
|
|
self.data_center = data_center
|
|
|
|
self.client = None
|
|
|
|
if self.access_key is not None and self.secret_access_key is not None:
|
|
self.connect()
|
|
|
|
|
|
def connect( self ):
|
|
"""
|
|
connect
|
|
"""
|
|
|
|
# generate an error if keys are empty
|
|
|
|
linode_obj_config = {
|
|
"aws_access_key_id": self.access_key,
|
|
"aws_secret_access_key": self.secret_access_key,
|
|
"endpoint_url": f"https://{self.data_center}.linodeobjects.com",
|
|
}
|
|
self.client = boto3.client("s3", **linode_obj_config)
|
|
|
|
|
|
def connect_data_center( self, data_center ):
|
|
"""
|
|
connect_data_center
|
|
"""
|
|
|
|
# "us-southeast-1" = Atlanta, GA
|
|
# "us-ord-1" = Chicago, IL
|
|
# "us-lax-1" = Los Angeles, CA
|
|
# "us-mia-1" = Miami, FL
|
|
# "us-east-1" = Newark, NJ
|
|
# "us-sea-1" = Seattle, WA
|
|
# "us-iad-1" = Washington, DC
|
|
|
|
self.data_center = data_center
|
|
if self.access_key is not None and self.secret_access_key is not None:
|
|
self.connect()
|
|
|
|
|
|
def list_buckets( self ):
|
|
"""
|
|
list_buckets
|
|
"""
|
|
|
|
if self.client is None:
|
|
return None
|
|
|
|
r = self.client.list_buckets()
|
|
logging.debug("response keys = %s", str(r.keys()))
|
|
|
|
return r['Buckets']
|
|
|
|
|
|
def list_objects( self, bucket=None ):
|
|
"""
|
|
list_objects
|
|
"""
|
|
|
|
if self.client is None or bucket is None:
|
|
return None
|
|
|
|
r = self.client.list_objects(Bucket=bucket)
|
|
|
|
return r['Contents']
|
|
|
|
|
|
def list_objects_v2( self, bucket=None, prefix=None ):
|
|
"""
|
|
list_objects
|
|
"""
|
|
|
|
if self.client is None or bucket is None:
|
|
return None
|
|
|
|
r = self.client.list_objects_v2(Bucket=bucket, Prefix=prefix)
|
|
|
|
return r['Contents']
|
|
|
|
|
|
def upload_file( self, filepathname=None, bucket=None, key=None ):
|
|
"""
|
|
upload_file
|
|
"""
|
|
|
|
if filepathname is None:
|
|
raise AFS3Error("S3Storage.upload_file(): missing required filepathname")
|
|
if not os.path.exists(filepathname):
|
|
raise AFS3Error(f"S3Storage.upload_file(): invalid filepathname '{filepathname}'")
|
|
|
|
if bucket is None:
|
|
raise AFS3Error("S3Storage.upload_file(): missing required bucket name")
|
|
|
|
if key is None:
|
|
raise AFS3Error("S3Storage.upload_file(): missing required key name")
|
|
|
|
self.client.upload_file(Filename=filepathname,
|
|
Bucket=bucket,
|
|
Key=key,
|
|
ExtraArgs={'ACL': 'private'})
|
|
|
|
|
|
def upload_folder( self ):
|
|
"""
|
|
upload_folder
|
|
"""
|
|
return
|
|
|
|
|
|
def get_object( self, bucket=None, filename=None ):
|
|
"""
|
|
get_object
|
|
"""
|
|
|
|
if bucket is None:
|
|
raise AFS3Error("S3Storage.get_object(): missing required bucket name")
|
|
|
|
if filename is None:
|
|
raise AFS3Error("S3Storage.get_object(): missing required filename")
|
|
|
|
this_object = self.client.get_object(Bucket=bucket, Key=filename)
|
|
|
|
return this_object['Body']
|
|
|
|
|
|
def put_object( self, bucket=None, key=None, data=None ):
|
|
"""
|
|
put_object
|
|
"""
|
|
|
|
if bucket == None:
|
|
raise AFS3Error("S3Storage.put_object(): missing required bucket name")
|
|
|
|
if key == None:
|
|
raise AFS3Error("S3Storage.put_object(): missing required key")
|
|
|
|
if data == None:
|
|
raise AFS3Error("S3Storage.put_object(): missing required data")
|
|
|
|
response = self.client.put_object(Bucket=bucket, Key=key, Body=data)
|
|
|
|
# response looks like this:
|
|
# response = {'ResponseMetadata':
|
|
# {'RequestId': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default', 'HostId': '', 'HTTPStatusCode': 200,
|
|
# 'HTTPHeaders': {'date': 'Sun, 03 Dec 2023 06:37:32 GMT',
|
|
# 'content-length': '0',
|
|
# 'connection': 'keep-alive',
|
|
# 'etag': '"5d6ef8943a9f853a82247c9e87152c1f"',
|
|
# 'accept-ranges': 'bytes',
|
|
# 'x-amz-request-id': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default'},
|
|
# 'RetryAttempts': 0},
|
|
# 'ETag': '"5d6ef8943a9f853a82247c9e87152c1f"'}
|
|
|
|
return response
|
|
|
|
|
|
def put_csv_data( self, bucket=None, key=None, rowdata=None ):
|
|
if bucket == None:
|
|
raise AFS3Error("S3Storage.put_csv_data(): missing required bucket name")
|
|
|
|
if key == None:
|
|
raise AFS3Error("S3Storage.put_csv_data(): missing required key")
|
|
|
|
# should validate 'rowdata' is a list of dicts
|
|
if rowdata == None:
|
|
raise AFS3Error("S3Storage.put_csv_data(): missing required rowdata")
|
|
|
|
s3Path = f"s3://{bucket}/{key}"
|
|
with smart_open.open(s3Path, 'w', transport_params={'client': self.client}) as ofh:
|
|
writer = csv.DictWriter(ofh, fieldnames=rowdata[0].keys())
|
|
writer.writeheader()
|
|
for row in rowdata:
|
|
writer.writerow(row)
|
|
|
|
return
|
|
|
|
|
|
def delete_all_objects( self, bucket=None ):
|
|
"""
|
|
delete_all_objects
|
|
"""
|
|
|
|
if bucket == None:
|
|
raise AFS3Error("S3Storage.delete_all_objects(): missing required bucket name")
|
|
|
|
objects = self.client.list_objects(Bucket=bucket)
|
|
for obj in objects['Contents']:
|
|
self.client.delete_object(Bucket=bucket, Key=obj['Key'])
|
|
|
|
return
|
|
|
|
|
|
def download_all_objects( self, bucket=None, prefix=None, localdir=None):
|
|
"""
|
|
download_all_objects
|
|
"""
|
|
|
|
if bucket == None:
|
|
raise AFS3Error("S3Storage.download_all_objects(): missing required bucket name")
|
|
|
|
if prefix == None:
|
|
raise AFS3Error("S3Storage.download_all_objects(): missing required key prefix string")
|
|
|
|
if localdir == None:
|
|
raise AFS3Error("S3Storage.download_all_objects(): missing required localdir name")
|
|
|
|
objects = self.client.list_objects_v2(Bucket=bucket, Prefix=prefix)
|
|
# keys in 'objects' are: ['ResponseMetadata', 'IsTruncated', 'Contents', 'Name', 'Prefix', 'MaxKeys', 'EncodingType', 'KeyCount']
|
|
|
|
for obj in objects['Contents']:
|
|
# keys in 'obj' are: ['Key', 'LastModified', 'ETag', 'Size', 'StorageClass']
|
|
|
|
lessPrefix = obj['Key'].split(prefix)[-1]
|
|
|
|
fullPathname = f"{localdir}/{lessPrefix}"
|
|
dirList = fullPathname.split('/')
|
|
basefilename = dirList.pop(-1)
|
|
dirList.remove('')
|
|
mkDirPath = "/".join(dirList)
|
|
fullPathname = f"{mkDirPath}/{basefilename}"
|
|
|
|
if not os.path.exists(mkDirPath):
|
|
#print(f"making dir {mkDirPath}")
|
|
os.makedirs(mkDirPath)
|
|
#print(f"downloading {fullPathname}")
|
|
self.client.download_file(Bucket=bucket, Key=obj['Key'], Filename=fullPathname)
|
|
|
|
return
|
|
|
|
|
|
def download_object( self, bucket=None, key=None, filename=None ):
|
|
"""
|
|
download_object
|
|
"""
|
|
|
|
if bucket == None:
|
|
raise AFS3Error("S3Storage.download_object(): missing required bucket name")
|
|
|
|
if key == None:
|
|
raise AFS3Error("S3Storage.download_object(): missing required key")
|
|
|
|
if filename == None:
|
|
raise AFS3Error("S3Storage.download_object(): missing required filename")
|
|
|
|
response = self.client.download_file(Bucket=bucket, Key=key, Filename=filename)
|
|
|
|
# response looks like this:
|
|
# response = {'ResponseMetadata':
|
|
# {'RequestId': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default', 'HostId': '', 'HTTPStatusCode': 200,
|
|
# 'HTTPHeaders': {'date': 'Sun, 03 Dec 2023 06:37:32 GMT',
|
|
# 'content-length': '0',
|
|
# 'connection': 'keep-alive',
|
|
# 'etag': '"5d6ef8943a9f853a82247c9e87152c1f"',
|
|
# 'accept-ranges': 'bytes',
|
|
# 'x-amz-request-id': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default'},
|
|
# 'RetryAttempts': 0},
|
|
# 'ETag': '"5d6ef8943a9f853a82247c9e87152c1f"'}
|
|
|
|
return response
|