Compare commits

..

2 Commits

Author SHA1 Message Date
Nolan Burfield 20740e836e update setups 2024-11-29 13:31:29 -08:00
Nolan Burfield 5a9eec02ad add v3 for event patch 2024-11-29 13:27:45 -08:00
4 changed files with 42 additions and 166 deletions

View File

@ -12,4 +12,3 @@ from lib_af_api.token import Token
from lib_af_api.voterset import VoterSet
from lib_af_api.voter import Voter
from lib_af_api.user import User
from lib_af_api.assets import Assets

View File

@ -1,162 +0,0 @@
"""
Assets Class object
"""
import logging
import requests
import urllib.parse
import xmltodict
class Assets():
"""Assets class"""
def __init__(self, base_url, jwt):
"""
__init__ Init Assets
:param base_url: Base URL for SimpleStorageService API
:param jwt: JWT Authentication for SimpleStorageService API
"""
self.base_url = base_url
self.jwt = jwt
def get_list(self, business_key, campaign_key):
"""
get Get list of assets for the campaign
:param business_key: The Business key
:param campaign_key: The Campaign key
"""
# Set the headers for the request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {self.jwt}'
}
args = {
'business_key': business_key,
'campaign_key': campaign_key
}
# Get the list URL
response = requests.get(
f'{self.base_url}/api/v1/files',
headers=headers,
params=args,
timeout=300
)
# Good response. Get the list
if response.ok:
assets = {}
url_json = response.json()
if 'list_url' in url_json:
headers = {
'Content-Type': '',
'Accept': 'application/xml'
}
list_response = requests.get(
url_json['list_url'],
headers=headers,
timeout=300
)
# The json version of the returned list is an invalid structure and doesn't parse correctly.
# So, we get an XML structure and convert to a properly structured JSON
if list_response.ok:
bucket_items_dict = xmltodict.parse(list_response.content)
#assets = {}
#print(f"ListBucketResult:")
assets['bucket_name'] = bucket_items_dict['ListBucketResult']['Name']
assets['key_prefix'] = bucket_items_dict['ListBucketResult']['Prefix']
# for k,v in bucket_items_dict['ListBucketResult'].items():
# if k == "Contents":
# print("Contents:")
# for i,el in enumerate(v):
# print(f"\tItem {i}:")
# for k2,v2 in el.items():
# print(f"\t {k2}: {v2}")
# else:
# print(f"{k} = {v}")
asset_list = []
if 'ListBucketResult' in bucket_items_dict:
if 'Contents' in bucket_items_dict['ListBucketResult']:
for asset in bucket_items_dict['ListBucketResult']['Contents']:
name_parts = asset['Key'].rsplit('%2F')
asset_list.append({'key': asset['Key'], 'filename': urllib.parse.unquote_plus(name_parts[-1])})
assets['asset_list'] = asset_list
# print("\nassets:")
# print(f"\tbucket_name: {assets['bucket_name']}")
# print(f"\t key_prefix: {assets['key_prefix']}")
# if len(assets['asset_list']) > 0:
# print(f"\t asset_list:")
# for i,a in enumerate(assets['asset_list']):
# print(f"\t\t{i:02d}: {a['filename']}")
# else:
# print(f"\t asset_list: (no assets being stored)")
# print("")
return assets
# Log error and return null
logging.error("[AFCASSETS GET LIST] [%s]", response.text)
logging.error("[AFCASSETS GET LIST] [%s]", response.request.url)
return None
def file_copyevent(self, business_key, campaign_key, event_key, src_file, dest_file):
"""
file_copyevent: Copy a file from Campaign asset library into an event
:param business_key: The Business key
:param campaign_key: The Campaign key
:param event_key: The Event key
:param src_file: The Source filename
:param dst_file: The Destination filename
"""
# Set the headers for the request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {self.jwt}'
}
args = {
'business_key': business_key,
'campaign_key': campaign_key,
'event_key': event_key,
'src_file': src_file,
'dest_file': dest_file
}
print(f"[AFCASSETS]: args = {args}")
# Send the copy command
response = requests.get(
f'{self.base_url}/api/v1/file/copyevent',
headers=headers,
params=args,
timeout=300
)
# Good response. Get the list
if response.ok:
return response.content
# Log error and return null
logging.error("[AFCASSETS FILE_COPYEVENT] [%s]", response.text)
logging.error("[AFCASSETS FILE_COPYEVENT] [%s]", response.request.url)
return None

View File

@ -165,3 +165,42 @@ class Event():
return response.json(), response.status_code
except: # pylint: disable=bare-except
return None, response.status_code
def patch_v3(self, event_key, patch_data):
"""
patch_v3 Path the Event V3.
:param event_key: The Event key to update with API
:param patch_data: The data to update the Event
"""
# Set the headers for the request
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Authorization': f'Bearer {self.jwt}'
}
# Set the key to the patch data
patch_data['key'] = event_key
# Run the GET
response = requests.patch(
f'{self.base_url}/api/v1/events/{event_key}',
headers=headers,
timeout=30,
json=patch_data
)
# Good response. Return Event
if response.ok:
return response.json(), response.status_code
# Log error and retunr null
logging.error("[AFEVENT UPDATE] [%s]", response.text)
try:
return response.json(), response.status_code
except: # pylint: disable=bare-except
return None, response.status_code

View File

@ -9,7 +9,7 @@ with open('README.md', 'r') as f:
setup(
name='lib_af_api',
version='1.0.4',
version='1.0.5',
author='',
author_email='',
description='',
@ -20,7 +20,7 @@ setup(
packages=find_packages(),
python_requires='>=3.7',
install_requires=[
'pytz==2024.1',
'requests==2.31.0'
'pytz>=2024.1',
'requests>=2.32.3'
],
)