lib-afr-mosaic/lib_afr_mosaic/universe_map_file.py

562 lines
20 KiB
Python

"""
AFCMUniverseMapFile Class object
"""
import json
import os
import logging
import uuid
import time
import codecs
import phonenumbers
from lib_afr_mosaic.helpers import (
read_voterdata_csv_stream, write_csv_file
)
class AFCMUniverseMapFile:
"""
AFCMUniverseMapFile class
"""
def __init__( self, mosaic_filename=None ):
"""
__init__
:param mosaic_filename:
"""
self.mosaic_filename = mosaic_filename
self.mosaic_head = {}
self.metadata = {}
self.mosaic_file_list = {}
self.mosaic_postprocess = {}
self.files = []
self.voterset_s3_connection = None
self.voterset_s3_bucket = None
self.optout_s3_connection = None
self.optout_s3_bucket = None
self.cleaned_import_rows = []
self.removed_row_count = 0
self.processed_row_count = 0
self.final_rows = []
# Watch for duplicates across files
self.running_phone_numbers = {}
# Track error numbers
self.error_numbers = {}
self.stats = {
"incoming_count": { "total": 0 },
"removed_count": { "total": 0 },
"cleaned_count": { "total": 0 },
"mosaic_count": { "total": 0 },
"removed_details": {},
"timings": {}
}
def set_voterset_s3(self, s3_connection, bucket_name):
"""
set_voterset_s3 Set the S3 information for the 'voterset' bucket
:param s3_connection: S3 Connection object
:param bucket_name: Bucket name
"""
self.voterset_s3_connection = s3_connection
self.voterset_s3_bucket = bucket_name
def set_optout_s3(self, s3_connection, bucket_name):
"""
set_optout_s3 Set the S3 information for the 'optout' bucket
:param s3_connection: S3 Connection object
:param bucket_name: Bucket name
"""
self.optout_s3_connection = s3_connection
self.optout_s3_bucket = bucket_name
def read_mosaic_map_file( self, filename ):
"""
read_mosaic_map_file
"""
if filename == '' or filename is None:
return None
with open(filename, encoding='utf-8-sig') as json_file:
data = json.load(json_file)
#print("Type:", type(data))
if 'mosaic' in data:
self.mosaic_head = data['mosaic']
if 'metadata' in data:
self.metadata = data['metadata']
if 'files' in data:
self.mosaic_file_list = data['files']
if 'postprocess' in data:
self.mosaic_postprocess = data['postprocess']
return data
def read_mosaic_map_dict( self, map_dict : dict ):
"""
read_mosaic_map_dict
"""
if not isinstance(map_dict, dict):
return None
self.mosaic_head = map_dict.get('mosaic', {})
self.metadata = map_dict.get('metadata', {})
self.mosaic_file_list = map_dict.get('files', {})
self.mosaic_postprocess = map_dict.get('postprocess', {})
return None
def write_mosaic_map_file( self, filename ):
"""
write_mosaic_map_file
"""
if filename == '' or filename is None:
return None
output_object = json.dumps({"mosaic": self.mosaic_head,
"metadata": self.metadata,
"files": self.mosaic_file_list,
"postprocess": self.mosaic_postprocess}, indent=4)
with open(filename, 'w', encoding="UTF-8") as outfile:
outfile.write(output_object)
return None
def create_new_input_file( self, filename=None ):
"""
create_new_input_file
"""
mosaic_input_file = {
"filename": filename,
"details": {
"csv_get_url": None,
"csv_get_prepped_url": None,
"csv_get_removed_url": None,
"csv_put_url": None,
"mapping": {
"UID": None,
"FName": None,
"LName": None,
"Cell_Phone": None,
"Party": None,
"State": None,
"Precinct": None,
"County": None,
"URL": None
},
"operations": {
"insert": {"field":"*", "operator":"=", "value":"*"}
}
}
}
return mosaic_input_file
def add_new_input_file( self, ifo ):
"""
add_new_input_file
"""
if ifo is None:
return
self.files.append(ifo)
def generate_mapped_file(self):
"""
generate_mapped_file Generate the mapped file details
"""
try:
errors_filename_s3_key = "errors.json"
file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket,
errors_filename_s3_key)
self.error_numbers = json.load(codecs.getreader("utf-8")(file_data_stream))
except Exception as ex:
raise Exception(f"universe_map_file.generate_mapped_file: Failed to get {self.optout_s3_bucket}/{errors_filename_s3_key}: {ex}") from ex
if "optout_s3_key" in self.metadata and self.metadata["optout_s3_key"] != None:
try:
file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket,
self.metadata["optout_s3_key"])
optout_numbers = json.load(codecs.getreader("utf-8")(file_data_stream))
self.error_numbers.update(optout_numbers)
except Exception as ex:
logging.info(f"Optout key not found, skipping {self.optout_s3_bucket}/{self.metadata['optout_s3_key']}")
# Loop the files from the map
for ufilename in self.mosaic_file_list:
self.process_file(ufilename)
# Perform post-processing
self.postprocessing()
# Run final steps
self.write_final_file()
self.write_stats_file()
def postprocessing(self):
"""
postprocessing Performs actions for any postprocessing rules defined in the voterset map file
structure in the map file:
"postprocess": {
"range": {
"start": 0,
"end": 10000
}
}
"""
# need to figure out how to handle rules in defined order
# if no range is defined, this copies the entire list from self.cleaned_import_rows
range = self.mosaic_postprocess.get('range', {})
self.final_rows = self.cleaned_import_rows[range.get('start', None):range.get('end', None)]
return
def process_file(self, voterset_filename):
"""
process_file Process the file of the VoterSet
:param voterset_filename: The VoterSet filename to process
"""
overall_time_start = time.time()
if 'mapping' not in self.mosaic_file_list[voterset_filename]:
logging.debug("Missing 'mapping' key, skipping %s", voterset_filename)
return None
mapping = self.mosaic_file_list[voterset_filename]['mapping']
if 'Cell_Phone' not in mapping:
logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename)
return None
if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']:
logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s",
voterset_filename)
return None
# Faster & less memory to only read the keys we need from CSV
csv_keys_needed = []
for k,v in mapping.items():
if isinstance(v, dict):
if 'field' in v:
csv_keys_needed.append(v['field'])
elif 'combine' in v:
for k1,v1 in v['combine'].items():
if k1.startswith('field'):
csv_keys_needed.append(v1)
amplify_rows = []
removed_rows = []
reading_time_start = time.time()
try:
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
voterset_filename_s3_key)
except Exception as ex:
raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex
rows = read_voterdata_csv_stream(file_data_stream, csv_keys_needed)
reading_time = time.time() - reading_time_start
logging.debug("rows = %s", len(rows))
# Update incoming stats after reading this file map
self.stats["incoming_count"][voterset_filename] = len(rows)
self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename]
processing_time_start = time.time()
for i,r in enumerate(rows):
if 'combine' in mapping['Cell_Phone']:
cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}"
else:
cell_phone_num = r[mapping['Cell_Phone']['field']]
# Evaluate this row has a valid phone number
try:
parsed_number = phonenumbers.parse(cell_phone_num,'US')
except Exception: # pylint: disable=broad-except
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='could not parse phone number'"
removed_rows.append(r)
if "could not parse phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["could not parse phone number"] += 1
else:
self.stats["removed_details"]["could not parse phone number"] = 1
continue
# is possible phone number
if phonenumbers.is_possible_number(parsed_number):
possible_number = parsed_number
else:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
removed_rows.append(r)
if "invalid phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["invalid phone number"] += 1
else:
self.stats["removed_details"]["invalid phone number"] = 1
continue
# is valid phone number
if phonenumbers.is_valid_number(possible_number):
formatted_number = phonenumbers.format_number(possible_number,
phonenumbers.PhoneNumberFormat.E164)
else:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
removed_rows.append(r)
if "invalid phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["invalid phone number"] += 1
else:
self.stats["removed_details"]["invalid phone number"] = 1
continue
# look up carrier errors and optouts
if formatted_number in self.error_numbers:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='{self.error_numbers[formatted_number]}'"
removed_rows.append(r)
if self.error_numbers[formatted_number] in self.stats["removed_details"]:
self.stats["removed_details"][self.error_numbers[formatted_number]] += 1
else:
self.stats["removed_details"][self.error_numbers[formatted_number]] = 1
continue
# last check, make sure we haven't added this number already
if formatted_number in self.running_phone_numbers:
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='duplicate phone number'"
removed_rows.append(r)
if "duplicate phone number" in self.stats["removed_details"]:
self.stats["removed_details"]["duplicate phone number"] += 1
else:
self.stats["removed_details"]["duplicate phone number"] = 1
continue
new_row = {
'UID': 'n/a',
'FName': 'n/a',
'LName': 'n/a',
'Party': 'n/a',
'State': 'n/a',
'Precinct': 'n/a',
'County': 'n/a',
'URL': 'n/a'
}
# Map each value to new_row
for amplify_key in list(new_row):
if mapping[amplify_key] is None:
continue
if 'value' in mapping[amplify_key]:
new_row[amplify_key] = mapping[amplify_key]['value']
elif 'field' in mapping[amplify_key]:
old_key = mapping[amplify_key]['field']
if old_key != '' and old_key in r:
new_row[amplify_key] = r[old_key]
else:
logging.error("Unrecognized mapping for %s: %s",
amplify_key,
mapping[amplify_key])
if new_row['UID'] is None:
new_row['UID'] = str(uuid.uuid4())
if new_row['UID'].strip() in ['n/a', '']:
new_row['UID'] = str(uuid.uuid4())
new_row['Cell_Phone'] = formatted_number
amplify_rows.append(new_row)
self.cleaned_import_rows.append(new_row)
self.running_phone_numbers[formatted_number] = True
processing_time = time.time() - processing_time_start
(filename_prefix, extension) = os.path.splitext(voterset_filename)
# Need to write *_PREPPED.csv
writing_cleaned_time_start = time.time()
prep_file_name = f"{filename_prefix}_PREPPED.csv"
prep_full_pathname = f"/tmp/{prep_file_name}"
if len(amplify_rows) > 0:
try:
write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows)
except Exception as ex: # pylint: disable=broad-except
print(f"[WRITING {prep_file_name}] [{str(ex)}]")
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name)
self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(prep_full_pathname)
writing_cleaned_time = time.time() - writing_cleaned_time_start
# Need to write *_REMOVED.csv
writing_removed_time_start = time.time()
removed_file_name = f"{filename_prefix}_REMOVED.csv"
removed_full_pathname = f"/tmp/{removed_file_name}"
if len(removed_rows) > 0:
try:
write_csv_file(removed_full_pathname, removed_rows[0].keys(), removed_rows)
except Exception as ex: # pylint: disable=broad-except
print(f"[WRITING {removed_file_name}] [{str(ex)}]")
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name)
self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(removed_full_pathname)
writing_removed_time = time.time() - writing_removed_time_start
self.removed_row_count += len(removed_rows)
self.processed_row_count += len(rows)
# Update stats for this file
self.stats['cleaned_count'][voterset_filename] = len(amplify_rows)
self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename]
self.stats['removed_count'][voterset_filename] = len(removed_rows)
self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename]
overall_time = time.time() - overall_time_start
self.stats["timings"][voterset_filename] = {
"reading": reading_time,
"processing": processing_time,
"writing cleaned file": writing_cleaned_time,
"writing removed file": writing_removed_time,
"file total": overall_time
}
return None
def write_final_file(self):
"""
write_final_file Writes the final CSV to S3
"""
# Set the count of final voters
self.metadata["total_rows"] = len(self.final_rows)
# Need to write the FINAL file
final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv')
if len(self.final_rows) > 0:
final_full_pathname = f"/tmp/{final_filename}"
write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows)
# write S3
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename)
self.voterset_s3_connection.upload_file(filepathname=final_full_pathname,
bucket=self.voterset_s3_bucket,
key=this_s3_key)
# remove the temp file now
os.remove(final_full_pathname)
self.stats['mosaic_count'][final_filename] = len(self.final_rows)
self.stats['mosaic_count']["total"] = self.stats["mosaic_count"][final_filename]
def write_stats_file(self):
"""
write_stats_file Writes the stats JSON to S3
"""
# Set the count of stat data
self.metadata["cleaned_count"] = self.stats['cleaned_count']["total"]
self.metadata["total_removed"] = self.stats['removed_count']["total"]
# Write the STATS file
stats_filename = "mosaic_stats.json"
s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename)
json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8'))
self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket,
key=s3_key,
data=json_data)
def get_mosaic_map_dict(self) -> dict:
"""
get_mosaic_map_dict Generate the dictionary for the VoterSet mosaic map json
:return A generated dict
"""
return {
"mosaic": self.mosaic_head,
"metadata": self.metadata,
"files": self.mosaic_file_list,
"postprocess": self.mosaic_postprocess
}
def get_voter_object(self, voterset_key : str, business_key : str) -> dict:
"""
get_voter_object Get the voter to the API object
:param voterset_key: Voterset key of the voter
:param business_key: Business key of the voter
:return Dictionary object of the AF-VOTER
"""
# Check if there is data
if len(self.final_rows) < 1:
return None
# Build the AF Voter object
voter_row = self.final_rows[0]
filter_set = {'UID', 'FName', 'LName', 'Cell_Phone'}
data_fields_dict = {k:voter_row[k] for k in voter_row if k not in filter_set}
voter_dict = {
"voter_id": voter_row['UID'],
"first_name": voter_row['FName'],
"last_name": voter_row['LName'],
"voterset_keys": [voterset_key],
"business_key": business_key,
"data_fields": data_fields_dict,
"cell_phone": voter_row['Cell_Phone']
}
return voter_dict