diff --git a/lib_afc_mosiac/helpers.py b/lib_afc_mosiac/helpers.py new file mode 100644 index 0000000..0ae0b11 --- /dev/null +++ b/lib_afc_mosiac/helpers.py @@ -0,0 +1,41 @@ +""" +Helper functions for the Mosaic library +""" + +import csv +import codecs + +def read_voterdata_csv_stream(input_stream) -> list: + """ + read_voterdata_csv_stream: Read csv stream to list + + :param input_stream: Input stream to read from + + :return List of parsed csv rows + """ + + rows = [] + # note: using 'utf-8-sig' removes the annoying \ufeff BOM signature + reader = csv.DictReader(codecs.getreader("utf-8-sig")(input_stream)) + for row in reader: + key_list = row.keys() + new_row = {} + for k in key_list: + new_row[k] = row[k] + rows.append(new_row) + + return rows + + +def write_csv_file(full_file_pathname, fieldnames, rows): + """ + write_csv_file + + :param full_file_pathname: + :param fieldnames: + """ + with open(full_file_pathname, 'w', newline='', encoding='UTF-8') as csvfile: + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + for row in rows: + writer.writerow(row) diff --git a/lib_afc_mosiac/universe_map_file.py b/lib_afc_mosiac/universe_map_file.py index e356be9..5ef1aff 100644 --- a/lib_afc_mosiac/universe_map_file.py +++ b/lib_afc_mosiac/universe_map_file.py @@ -3,6 +3,15 @@ AFCMUniverseMapFile Class object """ import json +import os +import logging +import uuid + +import phonenumbers + +from lib_afc_mosiac.helpers import ( + read_voterdata_csv_stream, write_csv_file +) class AFCMUniverseMapFile: @@ -25,6 +34,52 @@ class AFCMUniverseMapFile: self.files = [] + self.voterset_s3_connection = None + self.voterset_s3_bucket = None + self.optout_s3_connection = None + self.optout_s3_bucket = None + + self.final_rows = [] + self.removed_row_count = 0 + self.processed_row_count = 0 + + # Watch for duplicates across files + self.running_phone_numbers = {} + + # Track error numbers + self.error_numbers = {} + + self.stats = { + "incoming_count": { "total": 0 }, + "removed_count": { "total": 0 }, + "cleaned_count": { "total": 0 }, + "removed_details": {} + } + + + def set_voterset_s3(self, s3_connection, bucket_name): + """ + set_voterset_s3 Set the S3 information for the 'voterset' bucket + + :param s3_connection: S3 Connection object + :param bucket_name: Bucket name + """ + + self.voterset_s3_connection = s3_connection + self.voterset_s3_bucket = bucket_name + + + def set_optout_s3(self, s3_connection, bucket_name): + """ + set_optout_s3 Set the S3 information for the 'optout' bucket + + :param s3_connection: S3 Connection object + :param bucket_name: Bucket name + """ + + self.optout_s3_connection = s3_connection + self.optout_s3_bucket = bucket_name + def read_mosaic_map_file( self, filename ): """ @@ -123,3 +178,225 @@ class AFCMUniverseMapFile: if ifo is None: return self.files.append(ifo) + + + def generate_mapped_file(self): + """ + generate_mapped_file Generate the mapped file details + """ + + # Loop the files from the map + for ufilename in self.mosaic_file_list: + self.process_file(ufilename) + # Run final steps + self.write_final_file() + self.write_stats_file() + + + def process_file(self, voterset_filename): + """ + process_file Process the file of the VoterSet + + :param voterset_filename: The VoterSet filename to process + """ + + amplify_rows = [] + removed_rows = [] + + voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename) + file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket, + voterset_filename_s3_key) + rows = read_voterdata_csv_stream(file_data_stream) + logging.debug("rows = %s", len(rows)) + + mapping = self.mosaic_file_list[voterset_filename]['mapping'] + + if 'Cell_Phone' not in mapping: + logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename) + return None + if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']: + logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s", + voterset_filename) + return None + + # Update incoming stats after reading this file map + self.stats["incoming_count"][voterset_filename] = len(rows) + self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename] + + for i,r in enumerate(rows): + + if 'combine' in mapping['Cell_Phone']: + cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}" + else: + cell_phone_num = r[mapping['Cell_Phone']['field']] + + # Evaluate this row has a valid phone number + try: + parsed_number = phonenumbers.parse(cell_phone_num,'US') + except Exception: # pylint: disable=broad-exception-caught + r['reason'] = f"bad row #{i} in {voterset_filename} reason='could not parse phone number'" + removed_rows.append(r) + if "could not parse phone number" in self.stats["removed_details"]: + self.stats["removed_details"]["could not parse phone number"] += 1 + else: + self.stats["removed_details"]["could not parse phone number"] = 1 + continue + + # is possible phone number + if phonenumbers.is_possible_number(parsed_number): + possible_number = parsed_number + else: + r['reason'] = f"bad row #{i} in {voterset_filename} reason='invalid phone number'" + removed_rows.append(r) + if "invalid phone number" in self.stats["removed_details"]: + self.stats["removed_details"]["invalid phone number"] += 1 + else: + self.stats["removed_details"]["invalid phone number"] = 1 + continue + + # is valid phone number + if phonenumbers.is_valid_number(possible_number): + formatted_number = phonenumbers.format_number(possible_number, + phonenumbers.PhoneNumberFormat.E164) + else: + r['reason'] = f"bad row #{i} in {voterset_filename} reason='invalid phone number'" + removed_rows.append(r) + if "invalid phone number" in self.stats["removed_details"]: + self.stats["removed_details"]["invalid phone number"] += 1 + else: + self.stats["removed_details"]["invalid phone number"] = 1 + continue + + # look up carrier errors and optouts + if formatted_number in self.error_numbers: + r['reason'] = f"bad row #{i} in {voterset_filename} reason='{self.error_numbers[formatted_number]}'" + removed_rows.append(r) + if self.error_numbers[formatted_number] in self.stats["removed_details"]: + self.stats["removed_details"][self.error_numbers[formatted_number]] += 1 + else: + self.stats["removed_details"][self.error_numbers[formatted_number]] = 1 + continue + + # last check, make sure we haven't added this number already + if formatted_number in self.running_phone_numbers: + r['reason'] = f"bad row #{i} in {voterset_filename} reason='duplicate phone number'" + removed_rows.append(r) + if "duplicate phone number" in self.stats["removed_details"]: + self.stats["removed_details"]["duplicate phone number"] += 1 + else: + self.stats["removed_details"]["duplicate phone number"] = 1 + continue + + new_row = { + 'UID': 'n/a', + 'FName': 'n/a', + 'LName': 'n/a', + 'Party': 'n/a', + 'State': 'n/a', + 'Precinct': 'n/a', + 'County': 'n/a', + 'URL': 'n/a' + } + + # Map each value to new_row + for amplify_key in list(new_row): + if mapping[amplify_key] is None: + continue + if 'value' in mapping[amplify_key]: + new_row[amplify_key] = mapping[amplify_key]['value'] + elif 'field' in mapping[amplify_key]: + old_key = mapping[amplify_key]['field'] + if old_key != '' and old_key in r: + new_row[amplify_key] = r[old_key] + else: + logging.error("Unrecognized mapping for %s: %s", + amplify_key, + mapping[amplify_key]) + + if new_row['UID'] is None or new_row['UID'] == 'n/a': + new_row['UID'] = uuid.uuid4() + + new_row['Cell_Phone'] = formatted_number + + amplify_rows.append(new_row) + self.final_rows.append(new_row) + + + (filename_prefix, extension) = os.path.splitext(voterset_filename) + + # Need to write *_PREPPED.csv + prep_file_name = f"{filename_prefix}_PREPPED.csv" + prep_full_pathname = f"/tmp/{prep_file_name}" + if len(amplify_rows) > 0: + write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows) + + # write S3 + this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name) + self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname, + bucket=self.voterset_s3_bucket, + key=this_s3_key) + + # remove the temp file now + os.remove(prep_full_pathname) + + # Need to write *_REMOVED.csv + removed_file_name = f"{filename_prefix}_REMOVED.csv" + removed_full_pathname = f"/tmp/{removed_file_name}" + if len(removed_rows) > 0: + write_csv_file(removed_full_pathname, removed_rows[0].keys(), amplify_rows) + + # write S3 + this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name) + self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname, + bucket=self.voterset_s3_bucket, + key=this_s3_key) + + # remove the temp file now + os.remove(removed_full_pathname) + + self.removed_row_count += len(removed_rows) + self.processed_row_count += len(rows) + + # Update cleaned & removed stats for this file + self.stats['cleaned_count'][voterset_filename] = len(amplify_rows) + self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename] + + self.stats['removed_count'][voterset_filename] = len(removed_rows) + self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename] + + return None + + + def write_final_file(self): + """ + write_final_file Writes the final CSV to S3 + """ + + # Need to write the FINAL file + if len(self.final_rows) > 0: + final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv') + final_full_pathname = f"/tmp/{final_filename}" + write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows) + + # write S3 + this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename) + self.voterset_s3_connection.upload_file(filepathname=final_full_pathname, + bucket=self.voterset_s3_bucket, + key=this_s3_key) + + # remove the temp file now + os.remove(final_full_pathname) + + + def write_stats_file(self): + """ + write_stats_file Writes the stats JSON to S3 + """ + + # Write the STATS file + stats_filename = "mosaic_stats.json" + s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename) + json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8')) + self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket, + key=s3_key, + data=json_data) diff --git a/setup.py b/setup.py index b04cc62..ad0313b 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,7 @@ with open('README.md', 'r') as f: setup( name='lib_afc_mosiac', - version='0.1.2', + version='0.1.3', author='', author_email='', description='', @@ -20,6 +20,6 @@ setup( packages=find_packages(), python_requires='>=3.7', install_requires=[ - + 'phonenumbers==8.13.26', ], )