first commit

This commit is contained in:
Nolan Burfield 2025-04-14 12:33:35 -07:00
commit 24809abcc1
8 changed files with 880 additions and 0 deletions

23
.gitignore vendored Normal file
View File

@ -0,0 +1,23 @@
venv/
test-venv/
*.pyc
__pycache__/
instance/
.pytest_cache/
.coverage
htmlcov/
dist/
build/
*.egg-info/
node_modules/
.envs
session_files/
mongo_data/
test_data/

1
README.md Normal file
View File

@ -0,0 +1 @@
Library AFR Mosaic

9
lib_afr_mosaic/__init__.py Executable file
View File

@ -0,0 +1,9 @@
"""
lib_afc_mosaic
---------------------------------------
A Python library for working with AFC Mosaic files
"""
from lib_afc_mosaic.universe_map_file import AFCMUniverseMapFile

View File

@ -0,0 +1,210 @@
import sys
import base64
import requests
import datetime
import json
# added this to silence annoying SSL warnings in requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class AFCBusiness:
def __init__( self, api_url=None, api_jwt=None, username=None, password=None ):
# Update this as we modify the library for newer versions
self.api_version = 'v1'
self.api_jwt = api_jwt
#self.api_url = f"https://afaccount.roosevelt.hyperscaleaf.com/api/{self.api_version}/businesses"
# Dev
self.api_url = f"https://afaccount.eisenhower.hyperscaleaf.com/api/"
# Prod
#self.api_url = f"https://afaccount.roosevelt.hyperscaleaf.com/api/"
self.credentials = {}
self.credentials['user_name'] = username #'rross'
self.credentials['password'] = password #'rGstj!f672q5FBZ'
self.headers = {}
self.headers['content-type'] = "application/json"
self.headers['accept'] = "application/json"
if self.api_jwt == None:
if self.credentials['user_name'] and self.credentials['password']:
self.Set_JWT()
else:
print("Unable to connect to API without jwt or authentication credentials")
sys.exit(1)
self.headers['authorization'] = "Bearer " + self.api_jwt
self.params = {}
def Set_JWT( self ):
url = f"{self.api_url}{self.api_version}/auth/authenticate"
try:
response = requests.post(url, headers=self.headers, json=self.credentials)
except requests.exceptions.RequestException as e:
print("exception: {}".format(e))
sys.exit(1)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("http error: {}".format(err))
sys.exit(1)
rj = response.json();
self.api_jwt = rj['jwt']
return
def Submit_Get( self, full_url, input_params={} ):
if not isinstance(full_url, str):
print("Submit_Get() error: must provide the full url string")
return None
try:
self.response = requests.get(full_url, verify=False, headers=self.headers, params=input_params)
#print("debug: url = {}".format(self.response.url))
except requests.exceptions.RequestException as e:
print("exception: {}".format(e))
#sys.exit(1)
return None
try:
self.response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("http error: {}".format(err))
#sys.exit(1)
return None
return self.response
def Submit_Post( self, full_url, input_params={}, data={} ):
if not isinstance(full_url, str):
print("Submit_Post() error: must provide the full url string")
return None
print(f"\ndata = {data}")
try:
this_headers = self.headers
this_headers['Content-Type'] = 'application/json'
#self.response = requests.post(full_url, verify=False, headers=this_headers, params=input_params, data=data)
self.response = requests.post(full_url, verify=False, headers=this_headers, data=json.dumps(data))
#print("debug: url = {}".format(self.response.url))
except requests.exceptions.RequestException as e:
print("exception: {}".format(e))
#sys.exit(1)
return None
try:
self.response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("http error: {}".format(err))
print(f"\nself.response.attributes = {self.response.__dict__}")
#sys.exit(1)
return None
return self.response
def Submit_Patch( self, full_url, input_params={}, data={} ):
if not isinstance(full_url, str):
print("Submit_Patch() error: must provide the full url string")
return None
#print(f"\ndata = {data}")
try:
this_headers = self.headers
this_headers['Content-Type'] = 'application/json'
#self.response = requests.post(full_url, verify=False, headers=this_headers, params=input_params, data=data)
self.response = requests.patch(full_url, verify=False, headers=this_headers, data=json.dumps(data))
#print("debug: url = {}".format(self.response.url))
except requests.exceptions.RequestException as e:
print("exception: {}".format(e))
#sys.exit(1)
return None
try:
self.response.raise_for_status()
except requests.exceptions.HTTPError as err:
print("http error: {}".format(err))
print(f"\nself.response.attributes = {self.response.__dict__}")
#sys.exit(1)
return None
return self.response
def GetBusinesses(self):
business_url = f"{self.api_url}{self.api_version}/businesses"
input_params = {}
# offset number in list
#input_params['start'] = 0
# max number of businesses to return in list
input_params['limit'] = 200
# filter to these status values
#input_params['status'] = ['active','archived']
r = self.Submit_Get(business_url, input_params)
results = r.json()
if 'values' in results:
# print(f"AFC Business List")
# for b in results['values']:
# print(f"{b}\n")
return results['values']
return None
def PostBusiness(self, business=None):
business_url = f"{self.api_url}{self.api_version}/businesses"
input_params = {}
# business_object = {
# 'name' : 'string',
# "account_id_legacy": "string",
# "primary_contact_name": "string",
# "primary_contact_phone": "string",
# "primary_contact_email": "string",
# "company_public_name": "string",
# "logo_url": "string",
# "bandwidth_campaign_id": "string",
# "reports_required": true,
# "sms_base_price_contract": 0,
# "mms_base_price_contract": 0,
# "mms_included_characters": 0,
# "afc_connect_base_price_contract": 0,
# "afc_crystal_pricing": 0
# }
r = self.Submit_Post(business_url, input_params, data=business)
if r == None:
return None
results = r.json()
return results
def PatchBusiness(self, business_key=None, fields=None):
if business_key == None:
print(f"Can't patch a business without the business key")
return None
if fields == None:
print(f"Can't patch a business without a dict of fields")
return None
business_url = f"{self.api_url}{self.api_version}/businesses/{business_key}"
r = self.Submit_Patch(business_url, input_params={}, data=fields)
if r == None:
return None
results = r.json()
return results

View File

@ -0,0 +1,6 @@
"""
AF Mosaic exceptions
"""
class AFCMUniverseMapFileError(Exception):
"""AFCMUniverseMapFileError class. Custom Exception object."""

45
lib_afr_mosaic/helpers.py Normal file
View File

@ -0,0 +1,45 @@
"""
Helper functions for the Mosaic library
"""
import csv
import codecs
def read_voterdata_csv_stream(input_stream, keys_needed=None) -> list:
"""
read_voterdata_csv_stream: Read csv stream to list
:param input_stream: Input stream to read from
:return List of parsed csv rows
"""
rows = []
# note: using 'utf-8-sig' removes the annoying \ufeff BOM signature
if keys_needed is None or len(keys_needed) > 0:
key_list = keys_needed
reader = csv.DictReader(codecs.getreader("utf-8-sig")(input_stream))
for row in reader:
if key_list is None:
key_list = row.keys()
new_row = {}
for k in key_list:
new_row[k] = row[k]
rows.append(new_row)
return rows
def write_csv_file(full_file_pathname, fieldnames, rows):
"""
write_csv_file
:param full_file_pathname:
:param fieldnames:
"""
with open(full_file_pathname, 'w', newline='', encoding='UTF-8') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)

View File

@ -0,0 +1,561 @@
"""
AFCMUniverseMapFile Class object
"""
import json
import os
import logging
import uuid
import time
import codecs
import phonenumbers
from lib_afc_mosaic.helpers import (
read_voterdata_csv_stream, write_csv_file
)
class AFCMUniverseMapFile:
"""
AFCMUniverseMapFile class
"""
def __init__( self, mosaic_filename=None ):
"""
__init__
:param mosaic_filename:
"""
self.mosaic_filename = mosaic_filename
self.mosaic_head = {}
self.metadata = {}
self.mosaic_file_list = {}
self.mosaic_postprocess = {}
self.files = []
self.voterset_s3_connection = None
self.voterset_s3_bucket = None
self.optout_s3_connection = None
self.optout_s3_bucket = None
self.cleaned_import_rows = []
self.removed_row_count = 0
self.processed_row_count = 0
self.final_rows = []
# Watch for duplicates across files
self.running_phone_numbers = {}
# Track error numbers
self.error_numbers = {}
self.stats = {
"incoming_count": { "total": 0 },
"removed_count": { "total": 0 },
"cleaned_count": { "total": 0 },
"mosaic_count": { "total": 0 },
"removed_details": {},
"timings": {}
}
def set_voterset_s3(self, s3_connection, bucket_name):
"""
set_voterset_s3 Set the S3 information for the 'voterset' bucket
:param s3_connection: S3 Connection object
:param bucket_name: Bucket name
"""
self.voterset_s3_connection = s3_connection
self.voterset_s3_bucket = bucket_name
def set_optout_s3(self, s3_connection, bucket_name):
"""
set_optout_s3 Set the S3 information for the 'optout' bucket
:param s3_connection: S3 Connection object
:param bucket_name: Bucket name
"""
self.optout_s3_connection = s3_connection
self.optout_s3_bucket = bucket_name
def read_mosaic_map_file( self, filename ):
"""
read_mosaic_map_file
"""
if filename == '' or filename is None:
return None
with open(filename, encoding='utf-8-sig') as json_file:
data = json.load(json_file)
#print("Type:", type(data))
if 'mosaic' in data:
self.mosaic_head = data['mosaic']
if 'metadata' in data:
self.metadata = data['metadata']
if 'files' in data:
self.mosaic_file_list = data['files']
if 'postprocess' in data:
self.mosaic_postprocess = data['postprocess']
return data
def read_mosaic_map_dict( self, map_dict : dict ):
"""
read_mosaic_map_dict
"""
if not isinstance(map_dict, dict):
return None
self.mosaic_head = map_dict.get('mosaic', {})
self.metadata = map_dict.get('metadata', {})
self.mosaic_file_list = map_dict.get('files', {})
self.mosaic_postprocess = map_dict.get('postprocess', {})
return None
def write_mosaic_map_file( self, filename ):
"""
write_mosaic_map_file
"""
if filename == '' or filename is None:
return None
output_object = json.dumps({"mosaic": self.mosaic_head,
"metadata": self.metadata,
"files": self.mosaic_file_list,
"postprocess": self.mosaic_postprocess}, indent=4)
with open(filename, 'w', encoding="UTF-8") as outfile:
outfile.write(output_object)
return None
def create_new_input_file( self, filename=None ):
"""
create_new_input_file
"""
mosaic_input_file = {
"filename": filename,
"details": {
"csv_get_url": None,
"csv_get_prepped_url": None,
"csv_get_removed_url": None,
"csv_put_url": None,
"mapping": {
"UID": None,
"FName": None,
"LName": None,
"Cell_Phone": None,
"Party": None,
"State": None,
"Precinct": None,
"County": None,
"URL": None
},
"operations": {
"insert": {"field":"*", "operator":"=", "value":"*"}
}
}
}
return mosaic_input_file
def add_new_input_file( self, ifo ):
"""
add_new_input_file
"""
if ifo is None:
return
self.files.append(ifo)
def generate_mapped_file(self):
"""
generate_mapped_file Generate the mapped file details
"""
try:
errors_filename_s3_key = "errors.json"
file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket,
errors_filename_s3_key)
self.error_numbers = json.load(codecs.getreader("utf-8")(file_data_stream))
except Exception as ex:
raise Exception(f"universe_map_file.generate_mapped_file: Failed to get {self.optout_s3_bucket}/{errors_filename_s3_key}: {ex}") from ex
if "optout_s3_key" in self.metadata and self.metadata["optout_s3_key"] != None:
try:
file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket,
self.metadata["optout_s3_key"])
optout_numbers = json.load(codecs.getreader("utf-8")(file_data_stream))
self.error_numbers.update(optout_numbers)
except Exception as ex:
logging.info(f"Optout key not found, skipping {self.optout_s3_bucket}/{self.metadata['optout_s3_key']}")
# Loop the files from the map
for ufilename in self.mosaic_file_list:
self.process_file(ufilename)
# Perform post-processing
self.postprocessing()
# Run final steps
self.write_final_file()
self.write_stats_file()
def postprocessing(self):
"""
postprocessing Performs actions for any postprocessing rules defined in the voterset map file
structure in the map file:
"postprocess": {
"range": {
"start": 0,
"end": 10000
}
}
"""
# need to figure out how to handle rules in defined order
# if no range is defined, this copies the entire list from self.cleaned_import_rows
range = self.mosaic_postprocess.get('range', {})
self.final_rows = self.cleaned_import_rows[range.get('start', None):range.get('end', None)]
return
def process_file(self, voterset_filename):
"""
process_file Process the file of the VoterSet
:param voterset_filename: The VoterSet filename to process
"""
overall_time_start = time.time()
if 'mapping' not in self.mosaic_file_list[voterset_filename]:
logging.debug("Missing 'mapping' key, skipping %s", voterset_filename)
return None
mapping = self.mosaic_file_list[voterset_filename]['mapping']
if 'Cell_Phone' not in mapping:
logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename)
return None
if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']:
logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s",
voterset_filename)
return None
# Faster & less memory to only read the keys we need from CSV
csv_keys_needed = []
for k,v in mapping.items():
if isinstance(v, dict):
if 'field' in v:
csv_keys_needed.append(v['field'])
elif 'combine' in v:
for k1,v1 in v['combine'].items():
if k1.startswith('field'):
csv_keys_needed.append(v1)
amplify_rows = []
removed_rows = []
reading_time_start = time.time()
try:
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
voterset_filename_s3_key)
except Exception as ex:
raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex
rows = read_voterdata_csv_stream(file_data_stream, csv_keys_needed)
reading_time = time.time() - reading_time_start
logging.debug("rows = %s", len(rows))
# Update incoming stats after reading this file map
self.stats["incoming_count"][voterset_filename] = len(rows)
self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename]
processing_time_start = time.time()
for i,r in enumerate(rows):
if 'combine' in mapping['Cell_Phone']:
cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}"
else:
cell_phone_num = r[mapping['Cell_Phone']['field']]
# Evaluate this row has a valid phone number
try:
parsed_number = phonenumbers.parse(cell_phone_num,'US')
except Exception: # pylint: disable=broad-except
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='could not parse phone number'"
removed_rows.append(r)
if "could not parse phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["could not parse phone number"] += 1
else:
self.stats["removed_details"]["could not parse phone number"] = 1
continue
# is possible phone number
if phonenumbers.is_possible_number(parsed_number):
possible_number = parsed_number
else:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
removed_rows.append(r)
if "invalid phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["invalid phone number"] += 1
else:
self.stats["removed_details"]["invalid phone number"] = 1
continue
# is valid phone number
if phonenumbers.is_valid_number(possible_number):
formatted_number = phonenumbers.format_number(possible_number,
phonenumbers.PhoneNumberFormat.E164)
else:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
removed_rows.append(r)
if "invalid phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["invalid phone number"] += 1
else:
self.stats["removed_details"]["invalid phone number"] = 1
continue
# look up carrier errors and optouts
if formatted_number in self.error_numbers:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='{self.error_numbers[formatted_number]}'"
removed_rows.append(r)
if self.error_numbers[formatted_number] in self.stats["removed_details"]:
self.stats["removed_details"][self.error_numbers[formatted_number]] += 1
else:
self.stats["removed_details"][self.error_numbers[formatted_number]] = 1
continue
# last check, make sure we haven't added this number already
if formatted_number in self.running_phone_numbers:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='duplicate phone number'"
removed_rows.append(r)
if "duplicate phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["duplicate phone number"] += 1
else:
self.stats["removed_details"]["duplicate phone number"] = 1
continue
new_row = {
'UID': 'n/a',
'FName': 'n/a',
'LName': 'n/a',
'Party': 'n/a',
'State': 'n/a',
'Precinct': 'n/a',
'County': 'n/a',
'URL': 'n/a'
}
# Map each value to new_row
for amplify_key in list(new_row):
if mapping[amplify_key] is None:
continue
if 'value' in mapping[amplify_key]:
new_row[amplify_key] = mapping[amplify_key]['value']
elif 'field' in mapping[amplify_key]:
old_key = mapping[amplify_key]['field']
if old_key != '' and old_key in r:
new_row[amplify_key] = r[old_key]
else:
logging.error("Unrecognized mapping for %s: %s",
amplify_key,
mapping[amplify_key])
if new_row['UID'] is None:
new_row['UID'] = str(uuid.uuid4())
if new_row['UID'].strip() in ['n/a', '']:
new_row['UID'] = str(uuid.uuid4())
new_row['Cell_Phone'] = formatted_number
amplify_rows.append(new_row)
self.cleaned_import_rows.append(new_row)
self.running_phone_numbers[formatted_number] = True
processing_time = time.time() - processing_time_start
(filename_prefix, extension) = os.path.splitext(voterset_filename)
# Need to write *_PREPPED.csv
writing_cleaned_time_start = time.time()
prep_file_name = f"{filename_prefix}_PREPPED.csv"
prep_full_pathname = f"/tmp/{prep_file_name}"
if len(amplify_rows) > 0:
try:
write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows)
except Exception as ex: # pylint: disable=broad-except
print(f"[WRITING {prep_file_name}] [{str(ex)}]")
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name)
self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(prep_full_pathname)
writing_cleaned_time = time.time() - writing_cleaned_time_start
# Need to write *_REMOVED.csv
writing_removed_time_start = time.time()
removed_file_name = f"{filename_prefix}_REMOVED.csv"
removed_full_pathname = f"/tmp/{removed_file_name}"
if len(removed_rows) > 0:
try:
write_csv_file(removed_full_pathname, removed_rows[0].keys(), removed_rows)
except Exception as ex: # pylint: disable=broad-except
print(f"[WRITING {removed_file_name}] [{str(ex)}]")
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name)
self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(removed_full_pathname)
writing_removed_time = time.time() - writing_removed_time_start
self.removed_row_count += len(removed_rows)
self.processed_row_count += len(rows)
# Update stats for this file
self.stats['cleaned_count'][voterset_filename] = len(amplify_rows)
self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename]
self.stats['removed_count'][voterset_filename] = len(removed_rows)
self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename]
overall_time = time.time() - overall_time_start
self.stats["timings"][voterset_filename] = {
"reading": reading_time,
"processing": processing_time,
"writing cleaned file": writing_cleaned_time,
"writing removed file": writing_removed_time,
"file total": overall_time
}
return None
def write_final_file(self):
"""
write_final_file Writes the final CSV to S3
"""
# Set the count of final voters
self.metadata["total_rows"] = len(self.final_rows)
# Need to write the FINAL file
final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv')
if len(self.final_rows) > 0:
final_full_pathname = f"/tmp/{final_filename}"
write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows)
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename)
self.voterset_s3_connection.upload_file(filepathname=final_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(final_full_pathname)
self.stats['mosaic_count'][final_filename] = len(self.final_rows)
self.stats['mosaic_count']["total"] = self.stats["mosaic_count"][final_filename]
def write_stats_file(self):
"""
write_stats_file Writes the stats JSON to S3
"""
# Set the count of stat data
self.metadata["cleaned_count"] = self.stats['cleaned_count']["total"]
self.metadata["total_removed"] = self.stats['removed_count']["total"]
# Write the STATS file
stats_filename = "mosaic_stats.json"
s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename)
json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8'))
self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket,
key=s3_key,
data=json_data)
def get_mosaic_map_dict(self) -> dict:
"""
get_mosaic_map_dict Generate the dictionary for the VoterSet mosaic map json
:return A generated dict
"""
return {
"mosaic": self.mosaic_head,
"metadata": self.metadata,
"files": self.mosaic_file_list,
"postprocess": self.mosaic_postprocess
}
def get_voter_object(self, voterset_key : str, campaign_key : str) -> dict:
"""
get_voter_object Get the voter to the API object
:param voterset_key: Voterset key of the voter
:param campaign_key: Campaign key of the voter
:return Dictionary object of the AF-VOTER
"""
# Check if there is data
if len(self.final_rows) < 1:
return None
# Build the AF Voter object
voter_row = self.final_rows[0]
filter_set = {'UID', 'FName', 'LName', 'Cell_Phone'}
data_fields_dict = {k:voter_row[k] for k in voter_row if k not in filter_set}
voter_dict = {
"voter_id": voter_row['UID'],
"first_name": voter_row['FName'],
"last_name": voter_row['LName'],
"voterset_keys": [voterset_key],
"campaign_key": campaign_key,
"data_fields": data_fields_dict,
"cell_phone": voter_row['Cell_Phone']
}
return voter_dict

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
"""
Project build definition file.
"""
from setuptools import setup, find_packages
with open('README.md', 'r') as f:
long_description = f.read()
setup(
name='lib_afr_mosaic',
version='1.0.0',
author='',
author_email='',
description='',
long_description=long_description,
long_description_content_type='text/markdown',
zip_safe=False,
include_package_data=False,
packages=find_packages(),
python_requires='>=3.7',
install_requires=[
'phonenumbers==8.13.26',
],
)