move generate map file to mosaic
This commit is contained in:
parent
a1f93a63e9
commit
f0d40e0a70
|
|
@ -0,0 +1,41 @@
|
|||
"""
|
||||
Helper functions for the Mosaic library
|
||||
"""
|
||||
|
||||
import csv
|
||||
import codecs
|
||||
|
||||
def read_voterdata_csv_stream(input_stream) -> list:
|
||||
"""
|
||||
read_voterdata_csv_stream: Read csv stream to list
|
||||
|
||||
:param input_stream: Input stream to read from
|
||||
|
||||
:return List of parsed csv rows
|
||||
"""
|
||||
|
||||
rows = []
|
||||
# note: using 'utf-8-sig' removes the annoying \ufeff BOM signature
|
||||
reader = csv.DictReader(codecs.getreader("utf-8-sig")(input_stream))
|
||||
for row in reader:
|
||||
key_list = row.keys()
|
||||
new_row = {}
|
||||
for k in key_list:
|
||||
new_row[k] = row[k]
|
||||
rows.append(new_row)
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
def write_csv_file(full_file_pathname, fieldnames, rows):
|
||||
"""
|
||||
write_csv_file
|
||||
|
||||
:param full_file_pathname:
|
||||
:param fieldnames:
|
||||
"""
|
||||
with open(full_file_pathname, 'w', newline='', encoding='UTF-8') as csvfile:
|
||||
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
||||
writer.writeheader()
|
||||
for row in rows:
|
||||
writer.writerow(row)
|
||||
|
|
@ -3,6 +3,15 @@ AFCMUniverseMapFile Class object
|
|||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
import phonenumbers
|
||||
|
||||
from lib_afc_mosiac.helpers import (
|
||||
read_voterdata_csv_stream, write_csv_file
|
||||
)
|
||||
|
||||
|
||||
class AFCMUniverseMapFile:
|
||||
|
|
@ -25,6 +34,52 @@ class AFCMUniverseMapFile:
|
|||
|
||||
self.files = []
|
||||
|
||||
self.voterset_s3_connection = None
|
||||
self.voterset_s3_bucket = None
|
||||
self.optout_s3_connection = None
|
||||
self.optout_s3_bucket = None
|
||||
|
||||
self.final_rows = []
|
||||
self.removed_row_count = 0
|
||||
self.processed_row_count = 0
|
||||
|
||||
# Watch for duplicates across files
|
||||
self.running_phone_numbers = {}
|
||||
|
||||
# Track error numbers
|
||||
self.error_numbers = {}
|
||||
|
||||
self.stats = {
|
||||
"incoming_count": { "total": 0 },
|
||||
"removed_count": { "total": 0 },
|
||||
"cleaned_count": { "total": 0 },
|
||||
"removed_details": {}
|
||||
}
|
||||
|
||||
|
||||
def set_voterset_s3(self, s3_connection, bucket_name):
|
||||
"""
|
||||
set_voterset_s3 Set the S3 information for the 'voterset' bucket
|
||||
|
||||
:param s3_connection: S3 Connection object
|
||||
:param bucket_name: Bucket name
|
||||
"""
|
||||
|
||||
self.voterset_s3_connection = s3_connection
|
||||
self.voterset_s3_bucket = bucket_name
|
||||
|
||||
|
||||
def set_optout_s3(self, s3_connection, bucket_name):
|
||||
"""
|
||||
set_optout_s3 Set the S3 information for the 'optout' bucket
|
||||
|
||||
:param s3_connection: S3 Connection object
|
||||
:param bucket_name: Bucket name
|
||||
"""
|
||||
|
||||
self.optout_s3_connection = s3_connection
|
||||
self.optout_s3_bucket = bucket_name
|
||||
|
||||
|
||||
def read_mosaic_map_file( self, filename ):
|
||||
"""
|
||||
|
|
@ -123,3 +178,225 @@ class AFCMUniverseMapFile:
|
|||
if ifo is None:
|
||||
return
|
||||
self.files.append(ifo)
|
||||
|
||||
|
||||
def generate_mapped_file(self):
|
||||
"""
|
||||
generate_mapped_file Generate the mapped file details
|
||||
"""
|
||||
|
||||
# Loop the files from the map
|
||||
for ufilename in self.mosaic_file_list:
|
||||
self.process_file(ufilename)
|
||||
# Run final steps
|
||||
self.write_final_file()
|
||||
self.write_stats_file()
|
||||
|
||||
|
||||
def process_file(self, voterset_filename):
|
||||
"""
|
||||
process_file Process the file of the VoterSet
|
||||
|
||||
:param voterset_filename: The VoterSet filename to process
|
||||
"""
|
||||
|
||||
amplify_rows = []
|
||||
removed_rows = []
|
||||
|
||||
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
|
||||
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
|
||||
voterset_filename_s3_key)
|
||||
rows = read_voterdata_csv_stream(file_data_stream)
|
||||
logging.debug("rows = %s", len(rows))
|
||||
|
||||
mapping = self.mosaic_file_list[voterset_filename]['mapping']
|
||||
|
||||
if 'Cell_Phone' not in mapping:
|
||||
logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename)
|
||||
return None
|
||||
if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']:
|
||||
logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s",
|
||||
voterset_filename)
|
||||
return None
|
||||
|
||||
# Update incoming stats after reading this file map
|
||||
self.stats["incoming_count"][voterset_filename] = len(rows)
|
||||
self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename]
|
||||
|
||||
for i,r in enumerate(rows):
|
||||
|
||||
if 'combine' in mapping['Cell_Phone']:
|
||||
cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}"
|
||||
else:
|
||||
cell_phone_num = r[mapping['Cell_Phone']['field']]
|
||||
|
||||
# Evaluate this row has a valid phone number
|
||||
try:
|
||||
parsed_number = phonenumbers.parse(cell_phone_num,'US')
|
||||
except Exception: # pylint: disable=broad-exception-caught
|
||||
r['reason'] = f"bad row #{i} in {voterset_filename} reason='could not parse phone number'"
|
||||
removed_rows.append(r)
|
||||
if "could not parse phone number" in self.stats["removed_details"]:
|
||||
self.stats["removed_details"]["could not parse phone number"] += 1
|
||||
else:
|
||||
self.stats["removed_details"]["could not parse phone number"] = 1
|
||||
continue
|
||||
|
||||
# is possible phone number
|
||||
if phonenumbers.is_possible_number(parsed_number):
|
||||
possible_number = parsed_number
|
||||
else:
|
||||
r['reason'] = f"bad row #{i} in {voterset_filename} reason='invalid phone number'"
|
||||
removed_rows.append(r)
|
||||
if "invalid phone number" in self.stats["removed_details"]:
|
||||
self.stats["removed_details"]["invalid phone number"] += 1
|
||||
else:
|
||||
self.stats["removed_details"]["invalid phone number"] = 1
|
||||
continue
|
||||
|
||||
# is valid phone number
|
||||
if phonenumbers.is_valid_number(possible_number):
|
||||
formatted_number = phonenumbers.format_number(possible_number,
|
||||
phonenumbers.PhoneNumberFormat.E164)
|
||||
else:
|
||||
r['reason'] = f"bad row #{i} in {voterset_filename} reason='invalid phone number'"
|
||||
removed_rows.append(r)
|
||||
if "invalid phone number" in self.stats["removed_details"]:
|
||||
self.stats["removed_details"]["invalid phone number"] += 1
|
||||
else:
|
||||
self.stats["removed_details"]["invalid phone number"] = 1
|
||||
continue
|
||||
|
||||
# look up carrier errors and optouts
|
||||
if formatted_number in self.error_numbers:
|
||||
r['reason'] = f"bad row #{i} in {voterset_filename} reason='{self.error_numbers[formatted_number]}'"
|
||||
removed_rows.append(r)
|
||||
if self.error_numbers[formatted_number] in self.stats["removed_details"]:
|
||||
self.stats["removed_details"][self.error_numbers[formatted_number]] += 1
|
||||
else:
|
||||
self.stats["removed_details"][self.error_numbers[formatted_number]] = 1
|
||||
continue
|
||||
|
||||
# last check, make sure we haven't added this number already
|
||||
if formatted_number in self.running_phone_numbers:
|
||||
r['reason'] = f"bad row #{i} in {voterset_filename} reason='duplicate phone number'"
|
||||
removed_rows.append(r)
|
||||
if "duplicate phone number" in self.stats["removed_details"]:
|
||||
self.stats["removed_details"]["duplicate phone number"] += 1
|
||||
else:
|
||||
self.stats["removed_details"]["duplicate phone number"] = 1
|
||||
continue
|
||||
|
||||
new_row = {
|
||||
'UID': 'n/a',
|
||||
'FName': 'n/a',
|
||||
'LName': 'n/a',
|
||||
'Party': 'n/a',
|
||||
'State': 'n/a',
|
||||
'Precinct': 'n/a',
|
||||
'County': 'n/a',
|
||||
'URL': 'n/a'
|
||||
}
|
||||
|
||||
# Map each value to new_row
|
||||
for amplify_key in list(new_row):
|
||||
if mapping[amplify_key] is None:
|
||||
continue
|
||||
if 'value' in mapping[amplify_key]:
|
||||
new_row[amplify_key] = mapping[amplify_key]['value']
|
||||
elif 'field' in mapping[amplify_key]:
|
||||
old_key = mapping[amplify_key]['field']
|
||||
if old_key != '' and old_key in r:
|
||||
new_row[amplify_key] = r[old_key]
|
||||
else:
|
||||
logging.error("Unrecognized mapping for %s: %s",
|
||||
amplify_key,
|
||||
mapping[amplify_key])
|
||||
|
||||
if new_row['UID'] is None or new_row['UID'] == 'n/a':
|
||||
new_row['UID'] = uuid.uuid4()
|
||||
|
||||
new_row['Cell_Phone'] = formatted_number
|
||||
|
||||
amplify_rows.append(new_row)
|
||||
self.final_rows.append(new_row)
|
||||
|
||||
|
||||
(filename_prefix, extension) = os.path.splitext(voterset_filename)
|
||||
|
||||
# Need to write *_PREPPED.csv
|
||||
prep_file_name = f"{filename_prefix}_PREPPED.csv"
|
||||
prep_full_pathname = f"/tmp/{prep_file_name}"
|
||||
if len(amplify_rows) > 0:
|
||||
write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows)
|
||||
|
||||
# write S3
|
||||
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name)
|
||||
self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname,
|
||||
bucket=self.voterset_s3_bucket,
|
||||
key=this_s3_key)
|
||||
|
||||
# remove the temp file now
|
||||
os.remove(prep_full_pathname)
|
||||
|
||||
# Need to write *_REMOVED.csv
|
||||
removed_file_name = f"{filename_prefix}_REMOVED.csv"
|
||||
removed_full_pathname = f"/tmp/{removed_file_name}"
|
||||
if len(removed_rows) > 0:
|
||||
write_csv_file(removed_full_pathname, removed_rows[0].keys(), amplify_rows)
|
||||
|
||||
# write S3
|
||||
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name)
|
||||
self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname,
|
||||
bucket=self.voterset_s3_bucket,
|
||||
key=this_s3_key)
|
||||
|
||||
# remove the temp file now
|
||||
os.remove(removed_full_pathname)
|
||||
|
||||
self.removed_row_count += len(removed_rows)
|
||||
self.processed_row_count += len(rows)
|
||||
|
||||
# Update cleaned & removed stats for this file
|
||||
self.stats['cleaned_count'][voterset_filename] = len(amplify_rows)
|
||||
self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename]
|
||||
|
||||
self.stats['removed_count'][voterset_filename] = len(removed_rows)
|
||||
self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def write_final_file(self):
|
||||
"""
|
||||
write_final_file Writes the final CSV to S3
|
||||
"""
|
||||
|
||||
# Need to write the FINAL file
|
||||
if len(self.final_rows) > 0:
|
||||
final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv')
|
||||
final_full_pathname = f"/tmp/{final_filename}"
|
||||
write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows)
|
||||
|
||||
# write S3
|
||||
this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename)
|
||||
self.voterset_s3_connection.upload_file(filepathname=final_full_pathname,
|
||||
bucket=self.voterset_s3_bucket,
|
||||
key=this_s3_key)
|
||||
|
||||
# remove the temp file now
|
||||
os.remove(final_full_pathname)
|
||||
|
||||
|
||||
def write_stats_file(self):
|
||||
"""
|
||||
write_stats_file Writes the stats JSON to S3
|
||||
"""
|
||||
|
||||
# Write the STATS file
|
||||
stats_filename = "mosaic_stats.json"
|
||||
s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename)
|
||||
json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8'))
|
||||
self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket,
|
||||
key=s3_key,
|
||||
data=json_data)
|
||||
|
|
|
|||
4
setup.py
4
setup.py
|
|
@ -9,7 +9,7 @@ with open('README.md', 'r') as f:
|
|||
|
||||
setup(
|
||||
name='lib_afc_mosiac',
|
||||
version='0.1.2',
|
||||
version='0.1.3',
|
||||
author='',
|
||||
author_email='',
|
||||
description='',
|
||||
|
|
@ -20,6 +20,6 @@ setup(
|
|||
packages=find_packages(),
|
||||
python_requires='>=3.7',
|
||||
install_requires=[
|
||||
|
||||
'phonenumbers==8.13.26',
|
||||
],
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue