""" S3Storage Class object """ import os import logging import boto3 import csv import smart_open from lib_afc_s3storage.afs3exception import AFS3Error class S3Storage: """ S3Storage class """ def __init__( self, access_key=None, secret_access_key=None, data_center="us-lax-1" ): """ __init__ Init S3 Storage class. :param access_key: S3 Bucket access key :param secret_access_key: S3 Bucket secret key :param data_center: S3 Bucket data center. Default us-lax-1 """ self.access_key = access_key self.secret_access_key = secret_access_key self.data_center = data_center self.client = None if self.access_key is not None and self.secret_access_key is not None: self.connect() def connect( self ): """ connect """ # generate an error if keys are empty linode_obj_config = { "aws_access_key_id": self.access_key, "aws_secret_access_key": self.secret_access_key, "endpoint_url": f"https://{self.data_center}.linodeobjects.com", } self.client = boto3.client("s3", **linode_obj_config) def connect_data_center( self, data_center ): """ connect_data_center """ # "us-southeast-1" = Atlanta, GA # "us-ord-1" = Chicago, IL # "us-lax-1" = Los Angeles, CA # "us-mia-1" = Miami, FL # "us-east-1" = Newark, NJ # "us-sea-1" = Seattle, WA # "us-iad-1" = Washington, DC self.data_center = data_center if self.access_key is not None and self.secret_access_key is not None: self.connect() def list_buckets( self ): """ list_buckets """ if self.client is None: return None r = self.client.list_buckets() logging.debug("response keys = %s", str(r.keys())) return r['Buckets'] def list_objects( self, bucket=None ): """ list_objects """ if self.client is None or bucket is None: return None r = self.client.list_objects(Bucket=bucket) return r['Contents'] def upload_file( self, filepathname=None, bucket=None, key=None ): """ upload_file """ if filepathname is None: raise AFS3Error("S3Storage.upload_file(): missing required filepathname") if not os.path.exists(filepathname): raise AFS3Error(f"S3Storage.upload_file(): invalid filepathname '{filepathname}'") if bucket is None: raise AFS3Error("S3Storage.upload_file(): missing required bucket name") if key is None: raise AFS3Error("S3Storage.upload_file(): missing required key name") self.client.upload_file(Filename=filepathname, Bucket=bucket, Key=key, ExtraArgs={'ACL': 'private'}) def upload_folder( self ): """ upload_folder """ return def get_object( self, bucket=None, filename=None ): """ get_object """ if bucket is None: raise AFS3Error("S3Storage.get_object(): missing required bucket name") if filename is None: raise AFS3Error("S3Storage.get_object(): missing required filename") this_object = self.client.get_object(Bucket=bucket, Key=filename) return this_object['Body'] def put_object( self, bucket=None, key=None, data=None ): """ put_object """ if bucket == None: raise AFS3Error("S3Storage.put_object(): missing required bucket name") if key == None: raise AFS3Error("S3Storage.put_object(): missing required key") if data == None: raise AFS3Error("S3Storage.put_object(): missing required data") response = self.client.put_object(Bucket=bucket, Key=key, Body=data) # response looks like this: # response = {'ResponseMetadata': # {'RequestId': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default', 'HostId': '', 'HTTPStatusCode': 200, # 'HTTPHeaders': {'date': 'Sun, 03 Dec 2023 06:37:32 GMT', # 'content-length': '0', # 'connection': 'keep-alive', # 'etag': '"5d6ef8943a9f853a82247c9e87152c1f"', # 'accept-ranges': 'bytes', # 'x-amz-request-id': 'tx00000ed21e851e5b2e313-00656c222c-32c0e88-default'}, # 'RetryAttempts': 0}, # 'ETag': '"5d6ef8943a9f853a82247c9e87152c1f"'} return response def put_csv_data( self, bucket=None, key=None, rowdata=None ): if bucket == None: raise AFS3Error("S3Storage.put_csv_data(): missing required bucket name") if key == None: raise AFS3Error("S3Storage.put_csv_data(): missing required key") # should validate 'rowdata' is a list of dicts if rowdata == None: raise AFS3Error("S3Storage.put_csv_data(): missing required rowdata") s3Path = f"s3://{bucket}/{key}" with smart_open.open(s3Path, 'w', transport_params={'client': self.client}) as ofh: writer = csv.DictWriter(ofh, fieldnames=rowdata[0].keys()) writer.writeheader() for row in rowdata: writer.writerow(row) return def delete_all_objects( self, bucket=None ): """ delete_all_objects """ if bucket == None: raise AFS3Error("S3Storage.delete_all_objects(): missing required bucket name") objects = self.client.list_objects(Bucket=bucket) for obj in objects['Contents']: self.client.delete_object(Bucket=bucket, Key=obj['Key']) return