2023-12-03 04:20:00 +00:00
|
|
|
"""
|
|
|
|
|
AFCMUniverseMapFile Class object
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
2023-12-07 06:21:25 +00:00
|
|
|
import os
|
|
|
|
|
import logging
|
|
|
|
|
import uuid
|
2023-12-09 00:26:09 +00:00
|
|
|
import time
|
2023-12-09 01:28:05 +00:00
|
|
|
import codecs
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
import phonenumbers
|
|
|
|
|
|
2023-12-07 23:47:14 +00:00
|
|
|
from lib_afc_mosaic.helpers import (
|
2023-12-07 06:21:25 +00:00
|
|
|
read_voterdata_csv_stream, write_csv_file
|
|
|
|
|
)
|
2023-12-03 04:20:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class AFCMUniverseMapFile:
|
|
|
|
|
"""
|
|
|
|
|
AFCMUniverseMapFile class
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__( self, mosaic_filename=None ):
|
|
|
|
|
"""
|
|
|
|
|
__init__
|
|
|
|
|
|
|
|
|
|
:param mosaic_filename:
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.mosaic_filename = mosaic_filename
|
|
|
|
|
|
2023-12-04 05:34:15 +00:00
|
|
|
self.mosaic_head = {}
|
|
|
|
|
self.metadata = {}
|
2023-12-03 04:20:00 +00:00
|
|
|
self.mosaic_file_list = {}
|
2024-01-19 00:40:12 +00:00
|
|
|
self.mosaic_postprocess = {}
|
2023-12-04 05:34:15 +00:00
|
|
|
|
2023-12-03 04:20:00 +00:00
|
|
|
self.files = []
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
self.voterset_s3_connection = None
|
|
|
|
|
self.voterset_s3_bucket = None
|
|
|
|
|
self.optout_s3_connection = None
|
|
|
|
|
self.optout_s3_bucket = None
|
|
|
|
|
|
2024-01-19 00:40:12 +00:00
|
|
|
self.cleaned_import_rows = []
|
2023-12-07 06:21:25 +00:00
|
|
|
self.removed_row_count = 0
|
|
|
|
|
self.processed_row_count = 0
|
2024-01-19 00:40:12 +00:00
|
|
|
self.final_rows = []
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
# Watch for duplicates across files
|
|
|
|
|
self.running_phone_numbers = {}
|
|
|
|
|
|
|
|
|
|
# Track error numbers
|
|
|
|
|
self.error_numbers = {}
|
|
|
|
|
|
|
|
|
|
self.stats = {
|
|
|
|
|
"incoming_count": { "total": 0 },
|
|
|
|
|
"removed_count": { "total": 0 },
|
|
|
|
|
"cleaned_count": { "total": 0 },
|
2024-01-20 18:01:00 +00:00
|
|
|
"mosaic_count": { "total": 0 },
|
2023-12-09 00:26:09 +00:00
|
|
|
"removed_details": {},
|
|
|
|
|
"timings": {}
|
2023-12-07 06:21:25 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_voterset_s3(self, s3_connection, bucket_name):
|
|
|
|
|
"""
|
|
|
|
|
set_voterset_s3 Set the S3 information for the 'voterset' bucket
|
|
|
|
|
|
|
|
|
|
:param s3_connection: S3 Connection object
|
|
|
|
|
:param bucket_name: Bucket name
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.voterset_s3_connection = s3_connection
|
|
|
|
|
self.voterset_s3_bucket = bucket_name
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def set_optout_s3(self, s3_connection, bucket_name):
|
|
|
|
|
"""
|
|
|
|
|
set_optout_s3 Set the S3 information for the 'optout' bucket
|
|
|
|
|
|
|
|
|
|
:param s3_connection: S3 Connection object
|
|
|
|
|
:param bucket_name: Bucket name
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
self.optout_s3_connection = s3_connection
|
|
|
|
|
self.optout_s3_bucket = bucket_name
|
|
|
|
|
|
2023-12-03 04:20:00 +00:00
|
|
|
|
|
|
|
|
def read_mosaic_map_file( self, filename ):
|
|
|
|
|
"""
|
|
|
|
|
read_mosaic_map_file
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if filename == '' or filename is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
with open(filename, encoding='utf-8-sig') as json_file:
|
|
|
|
|
data = json.load(json_file)
|
|
|
|
|
#print("Type:", type(data))
|
|
|
|
|
|
|
|
|
|
if 'mosaic' in data:
|
|
|
|
|
self.mosaic_head = data['mosaic']
|
|
|
|
|
|
|
|
|
|
if 'metadata' in data:
|
|
|
|
|
self.metadata = data['metadata']
|
|
|
|
|
|
|
|
|
|
if 'files' in data:
|
|
|
|
|
self.mosaic_file_list = data['files']
|
|
|
|
|
|
2024-01-19 00:40:12 +00:00
|
|
|
if 'postprocess' in data:
|
|
|
|
|
self.mosaic_postprocess = data['postprocess']
|
|
|
|
|
|
2023-12-03 04:20:00 +00:00
|
|
|
return data
|
|
|
|
|
|
|
|
|
|
|
2023-12-09 17:50:22 +00:00
|
|
|
def read_mosaic_map_dict( self, map_dict : dict ):
|
2023-12-04 05:34:15 +00:00
|
|
|
"""
|
|
|
|
|
read_mosaic_map_dict
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if not isinstance(map_dict, dict):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
self.mosaic_head = map_dict.get('mosaic', {})
|
|
|
|
|
self.metadata = map_dict.get('metadata', {})
|
|
|
|
|
self.mosaic_file_list = map_dict.get('files', {})
|
2024-01-19 17:42:20 +00:00
|
|
|
self.mosaic_postprocess = map_dict.get('postprocess', {})
|
2023-12-04 05:34:15 +00:00
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
2023-12-03 04:20:00 +00:00
|
|
|
def write_mosaic_map_file( self, filename ):
|
|
|
|
|
"""
|
|
|
|
|
write_mosaic_map_file
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if filename == '' or filename is None:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
output_object = json.dumps({"mosaic": self.mosaic_head,
|
|
|
|
|
"metadata": self.metadata,
|
2024-01-19 17:42:20 +00:00
|
|
|
"files": self.mosaic_file_list,
|
|
|
|
|
"postprocess": self.mosaic_postprocess}, indent=4)
|
2023-12-03 04:20:00 +00:00
|
|
|
|
|
|
|
|
with open(filename, 'w', encoding="UTF-8") as outfile:
|
|
|
|
|
outfile.write(output_object)
|
|
|
|
|
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_new_input_file( self, filename=None ):
|
|
|
|
|
"""
|
|
|
|
|
create_new_input_file
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
mosaic_input_file = {
|
|
|
|
|
"filename": filename,
|
|
|
|
|
"details": {
|
|
|
|
|
"csv_get_url": None,
|
|
|
|
|
"csv_get_prepped_url": None,
|
|
|
|
|
"csv_get_removed_url": None,
|
|
|
|
|
"csv_put_url": None,
|
|
|
|
|
"mapping": {
|
|
|
|
|
"UID": None,
|
|
|
|
|
"FName": None,
|
|
|
|
|
"LName": None,
|
|
|
|
|
"Cell_Phone": None,
|
|
|
|
|
"Party": None,
|
|
|
|
|
"State": None,
|
|
|
|
|
"Precinct": None,
|
|
|
|
|
"County": None,
|
|
|
|
|
"URL": None
|
|
|
|
|
},
|
|
|
|
|
"operations": {
|
|
|
|
|
"insert": {"field":"*", "operator":"=", "value":"*"}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return mosaic_input_file
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def add_new_input_file( self, ifo ):
|
|
|
|
|
"""
|
|
|
|
|
add_new_input_file
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
if ifo is None:
|
|
|
|
|
return
|
|
|
|
|
self.files.append(ifo)
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_mapped_file(self):
|
|
|
|
|
"""
|
|
|
|
|
generate_mapped_file Generate the mapped file details
|
|
|
|
|
"""
|
|
|
|
|
|
2023-12-09 01:28:05 +00:00
|
|
|
try:
|
|
|
|
|
errors_filename_s3_key = "errors.json"
|
|
|
|
|
file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket,
|
|
|
|
|
errors_filename_s3_key)
|
|
|
|
|
self.error_numbers = json.load(codecs.getreader("utf-8")(file_data_stream))
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
raise Exception(f"universe_map_file.generate_mapped_file: Failed to get {self.optout_s3_bucket}/{errors_filename_s3_key}: {ex}") from ex
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
# Loop the files from the map
|
|
|
|
|
for ufilename in self.mosaic_file_list:
|
|
|
|
|
self.process_file(ufilename)
|
2024-01-19 00:40:12 +00:00
|
|
|
|
|
|
|
|
# Perform post-processing
|
2024-01-19 21:42:58 +00:00
|
|
|
self.postprocessing()
|
2024-01-19 00:40:12 +00:00
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
# Run final steps
|
|
|
|
|
self.write_final_file()
|
|
|
|
|
self.write_stats_file()
|
|
|
|
|
|
|
|
|
|
|
2024-01-19 00:40:12 +00:00
|
|
|
def postprocessing(self):
|
|
|
|
|
"""
|
|
|
|
|
postprocessing Performs actions for any postprocessing rules defined in the voterset map file
|
|
|
|
|
|
2024-01-19 21:42:58 +00:00
|
|
|
structure in the map file:
|
|
|
|
|
"postprocess": {
|
|
|
|
|
"range": {
|
|
|
|
|
"start": 0,
|
|
|
|
|
"end": 10000
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"""
|
2024-01-19 00:40:12 +00:00
|
|
|
|
2024-01-19 21:42:58 +00:00
|
|
|
# need to figure out how to handle rules in defined order
|
2024-01-19 00:40:12 +00:00
|
|
|
|
2024-01-19 21:42:58 +00:00
|
|
|
# if no range is defined, this copies the entire list from self.cleaned_import_rows
|
|
|
|
|
range = self.mosaic_postprocess.get('range', {})
|
|
|
|
|
self.final_rows = self.cleaned_import_rows[range.get('start', None):range.get('end', None)]
|
2024-01-19 00:40:12 +00:00
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
def process_file(self, voterset_filename):
|
|
|
|
|
"""
|
|
|
|
|
process_file Process the file of the VoterSet
|
|
|
|
|
|
|
|
|
|
:param voterset_filename: The VoterSet filename to process
|
|
|
|
|
"""
|
2023-12-09 00:26:09 +00:00
|
|
|
overall_time_start = time.time()
|
2023-12-07 06:21:25 +00:00
|
|
|
|
2023-12-08 21:26:49 +00:00
|
|
|
if 'mapping' not in self.mosaic_file_list[voterset_filename]:
|
|
|
|
|
logging.debug("Missing 'mapping' key, skipping %s", voterset_filename)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
mapping = self.mosaic_file_list[voterset_filename]['mapping']
|
|
|
|
|
|
|
|
|
|
if 'Cell_Phone' not in mapping:
|
|
|
|
|
logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename)
|
|
|
|
|
return None
|
|
|
|
|
if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']:
|
|
|
|
|
logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s",
|
|
|
|
|
voterset_filename)
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Faster & less memory to only read the keys we need from CSV
|
|
|
|
|
csv_keys_needed = []
|
|
|
|
|
for k,v in mapping.items():
|
|
|
|
|
if isinstance(v, dict):
|
|
|
|
|
if 'field' in v:
|
|
|
|
|
csv_keys_needed.append(v['field'])
|
|
|
|
|
elif 'combine' in v:
|
|
|
|
|
for k1,v1 in v['combine'].items():
|
|
|
|
|
if k1.startswith('field'):
|
|
|
|
|
csv_keys_needed.append(v1)
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
amplify_rows = []
|
|
|
|
|
removed_rows = []
|
|
|
|
|
|
2023-12-09 00:26:09 +00:00
|
|
|
reading_time_start = time.time()
|
2023-12-08 20:32:19 +00:00
|
|
|
try:
|
|
|
|
|
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
|
|
|
|
|
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
|
|
|
|
|
voterset_filename_s3_key)
|
|
|
|
|
except Exception as ex:
|
|
|
|
|
raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex
|
|
|
|
|
|
2023-12-08 21:26:49 +00:00
|
|
|
rows = read_voterdata_csv_stream(file_data_stream, csv_keys_needed)
|
2023-12-09 00:26:09 +00:00
|
|
|
reading_time = time.time() - reading_time_start
|
2023-12-07 06:21:25 +00:00
|
|
|
logging.debug("rows = %s", len(rows))
|
|
|
|
|
|
|
|
|
|
# Update incoming stats after reading this file map
|
|
|
|
|
self.stats["incoming_count"][voterset_filename] = len(rows)
|
|
|
|
|
self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename]
|
|
|
|
|
|
2023-12-09 00:26:09 +00:00
|
|
|
processing_time_start = time.time()
|
2023-12-07 06:21:25 +00:00
|
|
|
for i,r in enumerate(rows):
|
|
|
|
|
|
|
|
|
|
if 'combine' in mapping['Cell_Phone']:
|
|
|
|
|
cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}"
|
|
|
|
|
else:
|
|
|
|
|
cell_phone_num = r[mapping['Cell_Phone']['field']]
|
|
|
|
|
|
|
|
|
|
# Evaluate this row has a valid phone number
|
|
|
|
|
try:
|
|
|
|
|
parsed_number = phonenumbers.parse(cell_phone_num,'US')
|
2023-12-09 07:22:21 +00:00
|
|
|
except Exception: # pylint: disable=broad-except
|
2023-12-08 20:32:19 +00:00
|
|
|
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='could not parse phone number'"
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_rows.append(r)
|
|
|
|
|
if "could not parse phone number" in self.stats["removed_details"]:
|
|
|
|
|
self.stats["removed_details"]["could not parse phone number"] += 1
|
|
|
|
|
else:
|
|
|
|
|
self.stats["removed_details"]["could not parse phone number"] = 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# is possible phone number
|
|
|
|
|
if phonenumbers.is_possible_number(parsed_number):
|
|
|
|
|
possible_number = parsed_number
|
|
|
|
|
else:
|
2023-12-08 20:32:19 +00:00
|
|
|
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_rows.append(r)
|
|
|
|
|
if "invalid phone number" in self.stats["removed_details"]:
|
|
|
|
|
self.stats["removed_details"]["invalid phone number"] += 1
|
|
|
|
|
else:
|
|
|
|
|
self.stats["removed_details"]["invalid phone number"] = 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# is valid phone number
|
|
|
|
|
if phonenumbers.is_valid_number(possible_number):
|
|
|
|
|
formatted_number = phonenumbers.format_number(possible_number,
|
|
|
|
|
phonenumbers.PhoneNumberFormat.E164)
|
|
|
|
|
else:
|
2023-12-08 20:32:19 +00:00
|
|
|
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'"
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_rows.append(r)
|
|
|
|
|
if "invalid phone number" in self.stats["removed_details"]:
|
|
|
|
|
self.stats["removed_details"]["invalid phone number"] += 1
|
|
|
|
|
else:
|
|
|
|
|
self.stats["removed_details"]["invalid phone number"] = 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# look up carrier errors and optouts
|
|
|
|
|
if formatted_number in self.error_numbers:
|
2023-12-08 20:32:19 +00:00
|
|
|
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='{self.error_numbers[formatted_number]}'"
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_rows.append(r)
|
|
|
|
|
if self.error_numbers[formatted_number] in self.stats["removed_details"]:
|
|
|
|
|
self.stats["removed_details"][self.error_numbers[formatted_number]] += 1
|
|
|
|
|
else:
|
|
|
|
|
self.stats["removed_details"][self.error_numbers[formatted_number]] = 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
# last check, make sure we haven't added this number already
|
|
|
|
|
if formatted_number in self.running_phone_numbers:
|
2023-12-08 20:32:19 +00:00
|
|
|
r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='duplicate phone number'"
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_rows.append(r)
|
|
|
|
|
if "duplicate phone number" in self.stats["removed_details"]:
|
|
|
|
|
self.stats["removed_details"]["duplicate phone number"] += 1
|
|
|
|
|
else:
|
|
|
|
|
self.stats["removed_details"]["duplicate phone number"] = 1
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
new_row = {
|
|
|
|
|
'UID': 'n/a',
|
|
|
|
|
'FName': 'n/a',
|
|
|
|
|
'LName': 'n/a',
|
|
|
|
|
'Party': 'n/a',
|
|
|
|
|
'State': 'n/a',
|
|
|
|
|
'Precinct': 'n/a',
|
|
|
|
|
'County': 'n/a',
|
|
|
|
|
'URL': 'n/a'
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
# Map each value to new_row
|
|
|
|
|
for amplify_key in list(new_row):
|
|
|
|
|
if mapping[amplify_key] is None:
|
|
|
|
|
continue
|
|
|
|
|
if 'value' in mapping[amplify_key]:
|
|
|
|
|
new_row[amplify_key] = mapping[amplify_key]['value']
|
|
|
|
|
elif 'field' in mapping[amplify_key]:
|
|
|
|
|
old_key = mapping[amplify_key]['field']
|
|
|
|
|
if old_key != '' and old_key in r:
|
|
|
|
|
new_row[amplify_key] = r[old_key]
|
|
|
|
|
else:
|
|
|
|
|
logging.error("Unrecognized mapping for %s: %s",
|
|
|
|
|
amplify_key,
|
|
|
|
|
mapping[amplify_key])
|
|
|
|
|
|
2024-02-26 19:40:59 +00:00
|
|
|
if new_row['UID'] is None:
|
|
|
|
|
new_row['UID'] = str(uuid.uuid4())
|
|
|
|
|
|
|
|
|
|
if new_row['UID'].strip() in ['n/a', '']:
|
2023-12-07 06:57:26 +00:00
|
|
|
new_row['UID'] = str(uuid.uuid4())
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
new_row['Cell_Phone'] = formatted_number
|
|
|
|
|
|
|
|
|
|
amplify_rows.append(new_row)
|
2024-01-19 00:40:12 +00:00
|
|
|
self.cleaned_import_rows.append(new_row)
|
2023-12-09 00:37:37 +00:00
|
|
|
self.running_phone_numbers[formatted_number] = True
|
2023-12-07 06:21:25 +00:00
|
|
|
|
2023-12-09 00:26:09 +00:00
|
|
|
processing_time = time.time() - processing_time_start
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
(filename_prefix, extension) = os.path.splitext(voterset_filename)
|
|
|
|
|
|
|
|
|
|
# Need to write *_PREPPED.csv
|
2023-12-09 00:26:09 +00:00
|
|
|
writing_cleaned_time_start = time.time()
|
2023-12-07 06:21:25 +00:00
|
|
|
prep_file_name = f"{filename_prefix}_PREPPED.csv"
|
|
|
|
|
prep_full_pathname = f"/tmp/{prep_file_name}"
|
|
|
|
|
if len(amplify_rows) > 0:
|
2023-12-08 17:20:21 +00:00
|
|
|
try:
|
|
|
|
|
write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows)
|
2023-12-09 07:22:21 +00:00
|
|
|
except Exception as ex: # pylint: disable=broad-except
|
2023-12-08 17:20:21 +00:00
|
|
|
print(f"[WRITING {prep_file_name}] [{str(ex)}]")
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
# write S3
|
|
|
|
|
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name)
|
|
|
|
|
self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname,
|
|
|
|
|
bucket=self.voterset_s3_bucket,
|
|
|
|
|
key=this_s3_key)
|
|
|
|
|
|
|
|
|
|
# remove the temp file now
|
|
|
|
|
os.remove(prep_full_pathname)
|
2023-12-09 00:26:09 +00:00
|
|
|
writing_cleaned_time = time.time() - writing_cleaned_time_start
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
# Need to write *_REMOVED.csv
|
2023-12-09 00:26:09 +00:00
|
|
|
writing_removed_time_start = time.time()
|
2023-12-07 06:21:25 +00:00
|
|
|
removed_file_name = f"{filename_prefix}_REMOVED.csv"
|
|
|
|
|
removed_full_pathname = f"/tmp/{removed_file_name}"
|
|
|
|
|
if len(removed_rows) > 0:
|
2023-12-08 17:20:21 +00:00
|
|
|
try:
|
|
|
|
|
write_csv_file(removed_full_pathname, removed_rows[0].keys(), removed_rows)
|
2023-12-09 07:22:21 +00:00
|
|
|
except Exception as ex: # pylint: disable=broad-except
|
2023-12-08 17:20:21 +00:00
|
|
|
print(f"[WRITING {removed_file_name}] [{str(ex)}]")
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
# write S3
|
|
|
|
|
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name)
|
|
|
|
|
self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname,
|
|
|
|
|
bucket=self.voterset_s3_bucket,
|
|
|
|
|
key=this_s3_key)
|
|
|
|
|
|
|
|
|
|
# remove the temp file now
|
|
|
|
|
os.remove(removed_full_pathname)
|
2023-12-09 00:26:09 +00:00
|
|
|
writing_removed_time = time.time() - writing_removed_time_start
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
self.removed_row_count += len(removed_rows)
|
|
|
|
|
self.processed_row_count += len(rows)
|
|
|
|
|
|
2023-12-09 00:26:09 +00:00
|
|
|
# Update stats for this file
|
2023-12-07 06:21:25 +00:00
|
|
|
self.stats['cleaned_count'][voterset_filename] = len(amplify_rows)
|
|
|
|
|
self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename]
|
|
|
|
|
|
|
|
|
|
self.stats['removed_count'][voterset_filename] = len(removed_rows)
|
|
|
|
|
self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename]
|
|
|
|
|
|
2023-12-09 00:26:09 +00:00
|
|
|
overall_time = time.time() - overall_time_start
|
|
|
|
|
|
|
|
|
|
self.stats["timings"][voterset_filename] = {
|
|
|
|
|
"reading": reading_time,
|
|
|
|
|
"processing": processing_time,
|
|
|
|
|
"writing cleaned file": writing_cleaned_time,
|
|
|
|
|
"writing removed file": writing_removed_time,
|
|
|
|
|
"file total": overall_time
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_final_file(self):
|
|
|
|
|
"""
|
|
|
|
|
write_final_file Writes the final CSV to S3
|
|
|
|
|
"""
|
|
|
|
|
|
2023-12-09 17:50:22 +00:00
|
|
|
# Set the count of final voters
|
2023-12-09 07:22:21 +00:00
|
|
|
self.metadata["total_rows"] = len(self.final_rows)
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
# Need to write the FINAL file
|
2024-01-20 18:01:00 +00:00
|
|
|
final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv')
|
2023-12-07 06:21:25 +00:00
|
|
|
if len(self.final_rows) > 0:
|
|
|
|
|
final_full_pathname = f"/tmp/{final_filename}"
|
|
|
|
|
write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows)
|
|
|
|
|
|
|
|
|
|
# write S3
|
|
|
|
|
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename)
|
|
|
|
|
self.voterset_s3_connection.upload_file(filepathname=final_full_pathname,
|
|
|
|
|
bucket=self.voterset_s3_bucket,
|
|
|
|
|
key=this_s3_key)
|
|
|
|
|
|
|
|
|
|
# remove the temp file now
|
|
|
|
|
os.remove(final_full_pathname)
|
|
|
|
|
|
2024-01-20 18:01:00 +00:00
|
|
|
self.stats['mosaic_count'][final_filename] = len(self.final_rows)
|
|
|
|
|
self.stats['mosaic_count']["total"] = self.stats["mosaic_count"][final_filename]
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
|
|
|
|
|
def write_stats_file(self):
|
|
|
|
|
"""
|
|
|
|
|
write_stats_file Writes the stats JSON to S3
|
|
|
|
|
"""
|
|
|
|
|
|
2023-12-09 17:50:22 +00:00
|
|
|
# Set the count of stat data
|
|
|
|
|
self.metadata["cleaned_count"] = self.stats['cleaned_count']["total"]
|
|
|
|
|
self.metadata["total_removed"] = self.stats['removed_count']["total"]
|
|
|
|
|
|
2023-12-07 06:21:25 +00:00
|
|
|
# Write the STATS file
|
|
|
|
|
stats_filename = "mosaic_stats.json"
|
|
|
|
|
s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename)
|
|
|
|
|
json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8'))
|
|
|
|
|
self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket,
|
|
|
|
|
key=s3_key,
|
|
|
|
|
data=json_data)
|
2023-12-07 06:57:26 +00:00
|
|
|
|
|
|
|
|
|
2023-12-09 07:22:21 +00:00
|
|
|
def get_mosaic_map_dict(self) -> dict:
|
|
|
|
|
"""
|
|
|
|
|
get_mosaic_map_dict Generate the dictionary for the VoterSet mosaic map json
|
|
|
|
|
|
|
|
|
|
:return A generated dict
|
|
|
|
|
"""
|
|
|
|
|
return {
|
|
|
|
|
"mosaic": self.mosaic_head,
|
|
|
|
|
"metadata": self.metadata,
|
|
|
|
|
"files": self.mosaic_file_list
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2023-12-09 17:50:22 +00:00
|
|
|
def get_voter_object(self, voterset_key : str, campaign_key : str) -> dict:
|
2023-12-07 06:57:26 +00:00
|
|
|
"""
|
|
|
|
|
get_voter_object Get the voter to the API object
|
|
|
|
|
|
2023-12-09 17:50:22 +00:00
|
|
|
:param voterset_key: Voterset key of the voter
|
|
|
|
|
:param campaign_key: Campaign key of the voter
|
|
|
|
|
|
2023-12-07 06:57:26 +00:00
|
|
|
:return Dictionary object of the AF-VOTER
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Check if there is data
|
|
|
|
|
if len(self.final_rows) < 1:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# Build the AF Voter object
|
|
|
|
|
voter_row = self.final_rows[0]
|
|
|
|
|
|
|
|
|
|
filter_set = {'UID', 'FName', 'LName', 'Cell_Phone'}
|
|
|
|
|
data_fields_dict = {k:voter_row[k] for k in voter_row if k not in filter_set}
|
|
|
|
|
|
|
|
|
|
voter_dict = {
|
|
|
|
|
"voter_id": voter_row['UID'],
|
|
|
|
|
"first_name": voter_row['FName'],
|
|
|
|
|
"last_name": voter_row['LName'],
|
2023-12-09 17:50:22 +00:00
|
|
|
"voterset_keys": [voterset_key],
|
|
|
|
|
"campaign_key": campaign_key,
|
2023-12-07 06:57:26 +00:00
|
|
|
"data_fields": data_fields_dict,
|
|
|
|
|
"cell_phone": voter_row['Cell_Phone']
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return voter_dict
|