""" AFCMUniverseMapFile Class object """ import json import os import logging import uuid import time import codecs import phonenumbers from lib_afc_mosaic.helpers import ( read_voterdata_csv_stream, write_csv_file ) class AFCMUniverseMapFile: """ AFCMUniverseMapFile class """ def __init__( self, mosaic_filename=None ): """ __init__ :param mosaic_filename: """ self.mosaic_filename = mosaic_filename self.mosaic_head = {} self.metadata = {} self.mosaic_file_list = {} self.voterset_key = None self.files = [] self.voterset_s3_connection = None self.voterset_s3_bucket = None self.optout_s3_connection = None self.optout_s3_bucket = None self.final_rows = [] self.removed_row_count = 0 self.processed_row_count = 0 # Watch for duplicates across files self.running_phone_numbers = {} # Track error numbers self.error_numbers = {} self.stats = { "incoming_count": { "total": 0 }, "removed_count": { "total": 0 }, "cleaned_count": { "total": 0 }, "removed_details": {}, "timings": {} } def set_voterset_s3(self, s3_connection, bucket_name): """ set_voterset_s3 Set the S3 information for the 'voterset' bucket :param s3_connection: S3 Connection object :param bucket_name: Bucket name """ self.voterset_s3_connection = s3_connection self.voterset_s3_bucket = bucket_name def set_optout_s3(self, s3_connection, bucket_name): """ set_optout_s3 Set the S3 information for the 'optout' bucket :param s3_connection: S3 Connection object :param bucket_name: Bucket name """ self.optout_s3_connection = s3_connection self.optout_s3_bucket = bucket_name def read_mosaic_map_file( self, filename ): """ read_mosaic_map_file """ if filename == '' or filename is None: return None with open(filename, encoding='utf-8-sig') as json_file: data = json.load(json_file) #print("Type:", type(data)) if 'mosaic' in data: self.mosaic_head = data['mosaic'] if 'metadata' in data: self.metadata = data['metadata'] if 'files' in data: self.mosaic_file_list = data['files'] return data def read_mosaic_map_dict( self, voterset_key : str, map_dict : dict ): """ read_mosaic_map_dict """ if not isinstance(map_dict, dict): return None self.mosaic_head = map_dict.get('mosaic', {}) self.metadata = map_dict.get('metadata', {}) self.mosaic_file_list = map_dict.get('files', {}) self.voterset_key = voterset_key return None def write_mosaic_map_file( self, filename ): """ write_mosaic_map_file """ if filename == '' or filename is None: return None output_object = json.dumps({"mosaic": self.mosaic_head, "metadata": self.metadata, "files": self.mosaic_file_list}, indent=4) with open(filename, 'w', encoding="UTF-8") as outfile: outfile.write(output_object) return None def create_new_input_file( self, filename=None ): """ create_new_input_file """ mosaic_input_file = { "filename": filename, "details": { "csv_get_url": None, "csv_get_prepped_url": None, "csv_get_removed_url": None, "csv_put_url": None, "mapping": { "UID": None, "FName": None, "LName": None, "Cell_Phone": None, "Party": None, "State": None, "Precinct": None, "County": None, "URL": None }, "operations": { "insert": {"field":"*", "operator":"=", "value":"*"} } } } return mosaic_input_file def add_new_input_file( self, ifo ): """ add_new_input_file """ if ifo is None: return self.files.append(ifo) def generate_mapped_file(self): """ generate_mapped_file Generate the mapped file details """ try: errors_filename_s3_key = "errors.json" file_data_stream = self.optout_s3_connection.get_object(self.optout_s3_bucket, errors_filename_s3_key) self.error_numbers = json.load(codecs.getreader("utf-8")(file_data_stream)) except Exception as ex: raise Exception(f"universe_map_file.generate_mapped_file: Failed to get {self.optout_s3_bucket}/{errors_filename_s3_key}: {ex}") from ex # Loop the files from the map for ufilename in self.mosaic_file_list: self.process_file(ufilename) # Run final steps self.write_final_file() self.write_stats_file() def process_file(self, voterset_filename): """ process_file Process the file of the VoterSet :param voterset_filename: The VoterSet filename to process """ overall_time_start = time.time() if 'mapping' not in self.mosaic_file_list[voterset_filename]: logging.debug("Missing 'mapping' key, skipping %s", voterset_filename) return None mapping = self.mosaic_file_list[voterset_filename]['mapping'] if 'Cell_Phone' not in mapping: logging.debug("'Cell_Phone' not mapped, skipping %s", voterset_filename) return None if 'field' not in mapping['Cell_Phone'] and 'combine' not in mapping['Cell_Phone']: logging.debug("No field or combination mapped to 'Cell_Phone', skipping %s", voterset_filename) return None # Faster & less memory to only read the keys we need from CSV csv_keys_needed = [] for k,v in mapping.items(): if isinstance(v, dict): if 'field' in v: csv_keys_needed.append(v['field']) elif 'combine' in v: for k1,v1 in v['combine'].items(): if k1.startswith('field'): csv_keys_needed.append(v1) amplify_rows = [] removed_rows = [] reading_time_start = time.time() try: voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename) file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket, voterset_filename_s3_key) except Exception as ex: raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex rows = read_voterdata_csv_stream(file_data_stream, csv_keys_needed) reading_time = time.time() - reading_time_start logging.debug("rows = %s", len(rows)) # Update incoming stats after reading this file map self.stats["incoming_count"][voterset_filename] = len(rows) self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename] processing_time_start = time.time() for i,r in enumerate(rows): if 'combine' in mapping['Cell_Phone']: cell_phone_num = f"{r[mapping['Cell_Phone']['combine']['field1']]}{r[mapping['Cell_Phone']['combine']['field2']]}" else: cell_phone_num = r[mapping['Cell_Phone']['field']] # Evaluate this row has a valid phone number try: parsed_number = phonenumbers.parse(cell_phone_num,'US') except Exception: # pylint: disable=broad-exception-caught r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='could not parse phone number'" removed_rows.append(r) if "could not parse phone number" in self.stats["removed_details"]: self.stats["removed_details"]["could not parse phone number"] += 1 else: self.stats["removed_details"]["could not parse phone number"] = 1 continue # is possible phone number if phonenumbers.is_possible_number(parsed_number): possible_number = parsed_number else: r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'" removed_rows.append(r) if "invalid phone number" in self.stats["removed_details"]: self.stats["removed_details"]["invalid phone number"] += 1 else: self.stats["removed_details"]["invalid phone number"] = 1 continue # is valid phone number if phonenumbers.is_valid_number(possible_number): formatted_number = phonenumbers.format_number(possible_number, phonenumbers.PhoneNumberFormat.E164) else: r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='invalid phone number'" removed_rows.append(r) if "invalid phone number" in self.stats["removed_details"]: self.stats["removed_details"]["invalid phone number"] += 1 else: self.stats["removed_details"]["invalid phone number"] = 1 continue # look up carrier errors and optouts if formatted_number in self.error_numbers: r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='{self.error_numbers[formatted_number]}'" removed_rows.append(r) if self.error_numbers[formatted_number] in self.stats["removed_details"]: self.stats["removed_details"][self.error_numbers[formatted_number]] += 1 else: self.stats["removed_details"][self.error_numbers[formatted_number]] = 1 continue # last check, make sure we haven't added this number already if formatted_number in self.running_phone_numbers: r['afc_error'] = f"bad row #{i} in {voterset_filename}; reason='duplicate phone number'" removed_rows.append(r) if "duplicate phone number" in self.stats["removed_details"]: self.stats["removed_details"]["duplicate phone number"] += 1 else: self.stats["removed_details"]["duplicate phone number"] = 1 continue new_row = { 'UID': 'n/a', 'FName': 'n/a', 'LName': 'n/a', 'Party': 'n/a', 'State': 'n/a', 'Precinct': 'n/a', 'County': 'n/a', 'URL': 'n/a' } # Map each value to new_row for amplify_key in list(new_row): if mapping[amplify_key] is None: continue if 'value' in mapping[amplify_key]: new_row[amplify_key] = mapping[amplify_key]['value'] elif 'field' in mapping[amplify_key]: old_key = mapping[amplify_key]['field'] if old_key != '' and old_key in r: new_row[amplify_key] = r[old_key] else: logging.error("Unrecognized mapping for %s: %s", amplify_key, mapping[amplify_key]) if new_row['UID'] is None or new_row['UID'] == 'n/a': new_row['UID'] = str(uuid.uuid4()) new_row['Cell_Phone'] = formatted_number amplify_rows.append(new_row) self.final_rows.append(new_row) self.running_phone_numbers[formatted_number] = True processing_time = time.time() - processing_time_start (filename_prefix, extension) = os.path.splitext(voterset_filename) # Need to write *_PREPPED.csv writing_cleaned_time_start = time.time() prep_file_name = f"{filename_prefix}_PREPPED.csv" prep_full_pathname = f"/tmp/{prep_file_name}" if len(amplify_rows) > 0: try: write_csv_file(prep_full_pathname, amplify_rows[0].keys(), amplify_rows) except Exception as ex: print(f"[WRITING {prep_file_name}] [{str(ex)}]") # write S3 this_s3_key = os.path.join(self.metadata.get("s3_key", ""), prep_file_name) self.voterset_s3_connection.upload_file(filepathname=prep_full_pathname, bucket=self.voterset_s3_bucket, key=this_s3_key) # remove the temp file now os.remove(prep_full_pathname) writing_cleaned_time = time.time() - writing_cleaned_time_start # Need to write *_REMOVED.csv writing_removed_time_start = time.time() removed_file_name = f"{filename_prefix}_REMOVED.csv" removed_full_pathname = f"/tmp/{removed_file_name}" if len(removed_rows) > 0: try: write_csv_file(removed_full_pathname, removed_rows[0].keys(), removed_rows) except Exception as ex: print(f"[WRITING {removed_file_name}] [{str(ex)}]") # write S3 this_s3_key = os.path.join(self.metadata.get("s3_key", ""), removed_file_name) self.voterset_s3_connection.upload_file(filepathname=removed_full_pathname, bucket=self.voterset_s3_bucket, key=this_s3_key) # remove the temp file now os.remove(removed_full_pathname) writing_removed_time = time.time() - writing_removed_time_start self.removed_row_count += len(removed_rows) self.processed_row_count += len(rows) # Update stats for this file self.stats['cleaned_count'][voterset_filename] = len(amplify_rows) self.stats['cleaned_count']["total"] += self.stats["cleaned_count"][voterset_filename] self.stats['removed_count'][voterset_filename] = len(removed_rows) self.stats['removed_count']["total"] += self.stats["removed_count"][voterset_filename] overall_time = time.time() - overall_time_start self.stats["timings"][voterset_filename] = { "reading": reading_time, "processing": processing_time, "writing cleaned file": writing_cleaned_time, "writing removed file": writing_removed_time, "file total": overall_time } return None def write_final_file(self): """ write_final_file Writes the final CSV to S3 """ # Need to write the FINAL file if len(self.final_rows) > 0: final_filename = self.metadata.get('mosaic_filename', 'MOSAIC.csv') final_full_pathname = f"/tmp/{final_filename}" write_csv_file(final_full_pathname, self.final_rows[0].keys(), self.final_rows) # write S3 this_s3_key = os.path.join(self.metadata.get("s3_key", ""), final_filename) self.voterset_s3_connection.upload_file(filepathname=final_full_pathname, bucket=self.voterset_s3_bucket, key=this_s3_key) # remove the temp file now os.remove(final_full_pathname) def write_stats_file(self): """ write_stats_file Writes the stats JSON to S3 """ # Write the STATS file stats_filename = "mosaic_stats.json" s3_key = os.path.join(self.metadata.get("s3_key", ""), stats_filename) json_data = bytes(json.dumps(self.stats, indent=4).encode('UTF-8')) self.voterset_s3_connection.put_object(bucket=self.voterset_s3_bucket, key=s3_key, data=json_data) def get_voter_object(self) -> dict: """ get_voter_object Get the voter to the API object :return Dictionary object of the AF-VOTER """ # Check if there is data if len(self.final_rows) < 1: return None # Build the AF Voter object voter_row = self.final_rows[0] filter_set = {'UID', 'FName', 'LName', 'Cell_Phone'} data_fields_dict = {k:voter_row[k] for k in voter_row if k not in filter_set} voter_dict = { "voter_id": voter_row['UID'], "first_name": voter_row['FName'], "last_name": voter_row['LName'], "voterset_keys": [self.voterset_key], "data_fields": data_fields_dict, "cell_phone": voter_row['Cell_Phone'] } return voter_dict