From 6792538172aa979d1f944ec45d14c3fe2d070456 Mon Sep 17 00:00:00 2001 From: Rick Ross Date: Mon, 17 Jun 2024 10:37:08 -0700 Subject: [PATCH] Added Assets class in development for library --- lib_af_api/__init__.py | 1 + lib_af_api/assets.py | 162 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 lib_af_api/assets.py diff --git a/lib_af_api/__init__.py b/lib_af_api/__init__.py index b03b595..1ee8ec8 100644 --- a/lib_af_api/__init__.py +++ b/lib_af_api/__init__.py @@ -12,3 +12,4 @@ from lib_af_api.token import Token from lib_af_api.voterset import VoterSet from lib_af_api.voter import Voter from lib_af_api.user import User +from lib_af_api.assets import Assets diff --git a/lib_af_api/assets.py b/lib_af_api/assets.py new file mode 100644 index 0000000..109f6fd --- /dev/null +++ b/lib_af_api/assets.py @@ -0,0 +1,162 @@ +""" +Assets Class object +""" + +import logging +import requests +import urllib.parse +import xmltodict + + +class Assets(): + """Assets class""" + + def __init__(self, base_url, jwt): + """ + __init__ Init Assets + + :param base_url: Base URL for SimpleStorageService API + :param jwt: JWT Authentication for SimpleStorageService API + """ + + self.base_url = base_url + self.jwt = jwt + + + def get_list(self, business_key, campaign_key): + """ + get Get list of assets for the campaign + + :param business_key: The Business key + :param campaign_key: The Campaign key + """ + + # Set the headers for the request + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': f'Bearer {self.jwt}' + } + + args = { + 'business_key': business_key, + 'campaign_key': campaign_key + } + + # Get the list URL + response = requests.get( + f'{self.base_url}/api/v1/files', + headers=headers, + params=args, + timeout=300 + ) + + # Good response. Get the list + if response.ok: + assets = {} + url_json = response.json() + + if 'list_url' in url_json: + + headers = { + 'Content-Type': '', + 'Accept': 'application/xml' + } + + list_response = requests.get( + url_json['list_url'], + headers=headers, + timeout=300 + ) + + # The json version of the returned list is an invalid structure and doesn't parse correctly. + # So, we get an XML structure and convert to a properly structured JSON + if list_response.ok: + bucket_items_dict = xmltodict.parse(list_response.content) + + #assets = {} + #print(f"ListBucketResult:") + assets['bucket_name'] = bucket_items_dict['ListBucketResult']['Name'] + assets['key_prefix'] = bucket_items_dict['ListBucketResult']['Prefix'] + # for k,v in bucket_items_dict['ListBucketResult'].items(): + # if k == "Contents": + # print("Contents:") + # for i,el in enumerate(v): + # print(f"\tItem {i}:") + # for k2,v2 in el.items(): + # print(f"\t {k2}: {v2}") + # else: + # print(f"{k} = {v}") + + asset_list = [] + if 'ListBucketResult' in bucket_items_dict: + if 'Contents' in bucket_items_dict['ListBucketResult']: + for asset in bucket_items_dict['ListBucketResult']['Contents']: + name_parts = asset['Key'].rsplit('%2F') + asset_list.append({'key': asset['Key'], 'filename': urllib.parse.unquote_plus(name_parts[-1])}) + assets['asset_list'] = asset_list + + # print("\nassets:") + # print(f"\tbucket_name: {assets['bucket_name']}") + # print(f"\t key_prefix: {assets['key_prefix']}") + # if len(assets['asset_list']) > 0: + # print(f"\t asset_list:") + # for i,a in enumerate(assets['asset_list']): + # print(f"\t\t{i:02d}: {a['filename']}") + # else: + # print(f"\t asset_list: (no assets being stored)") + + # print("") + + return assets + + # Log error and return null + logging.error("[AFCASSETS GET LIST] [%s]", response.text) + logging.error("[AFCASSETS GET LIST] [%s]", response.request.url) + return None + + + def file_copyevent(self, business_key, campaign_key, event_key, src_file, dest_file): + """ + file_copyevent: Copy a file from Campaign asset library into an event + + :param business_key: The Business key + :param campaign_key: The Campaign key + :param event_key: The Event key + :param src_file: The Source filename + :param dst_file: The Destination filename + """ + + # Set the headers for the request + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Authorization': f'Bearer {self.jwt}' + } + + args = { + 'business_key': business_key, + 'campaign_key': campaign_key, + 'event_key': event_key, + 'src_file': src_file, + 'dest_file': dest_file + } + + print(f"[AFCASSETS]: args = {args}") + + # Send the copy command + response = requests.get( + f'{self.base_url}/api/v1/file/copyevent', + headers=headers, + params=args, + timeout=300 + ) + + # Good response. Get the list + if response.ok: + return response.content + + # Log error and return null + logging.error("[AFCASSETS FILE_COPYEVENT] [%s]", response.text) + logging.error("[AFCASSETS FILE_COPYEVENT] [%s]", response.request.url) + return None