Modified to only read in keys needed from CSV file.

This commit is contained in:
Rick Ross 2023-12-08 13:26:49 -08:00
parent 2f8d06dff4
commit 580c25f36e
2 changed files with 33 additions and 14 deletions

View File

@ -5,7 +5,7 @@ Helper functions for the Mosaic library
import csv
import codecs
def read_voterdata_csv_stream(input_stream) -> list:
def read_voterdata_csv_stream(input_stream, keys_needed=None) -> list:
"""
read_voterdata_csv_stream: Read csv stream to list
@ -16,8 +16,12 @@ def read_voterdata_csv_stream(input_stream) -> list:
rows = []
# note: using 'utf-8-sig' removes the annoying \ufeff BOM signature
if keys_needed is None or len(keys_needed) > 0:
key_list = keys_needed
reader = csv.DictReader(codecs.getreader("utf-8-sig")(input_stream))
for row in reader:
if key_list is None:
key_list = row.keys()
new_row = {}
for k in key_list:

View File

@ -202,18 +202,9 @@ class AFCMUniverseMapFile:
:param voterset_filename: The VoterSet filename to process
"""
amplify_rows = []
removed_rows = []
try:
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
voterset_filename_s3_key)
except Exception as ex:
raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex
rows = read_voterdata_csv_stream(file_data_stream)
logging.debug("rows = %s", len(rows))
if 'mapping' not in self.mosaic_file_list[voterset_filename]:
logging.debug("Missing 'mapping' key, skipping %s", voterset_filename)
return None
mapping = self.mosaic_file_list[voterset_filename]['mapping']
@ -225,6 +216,30 @@ class AFCMUniverseMapFile:
voterset_filename)
return None
# Faster & less memory to only read the keys we need from CSV
csv_keys_needed = []
for k,v in mapping.items():
if isinstance(v, dict):
if 'field' in v:
csv_keys_needed.append(v['field'])
elif 'combine' in v:
for k1,v1 in v['combine'].items():
if k1.startswith('field'):
csv_keys_needed.append(v1)
amplify_rows = []
removed_rows = []
try:
voterset_filename_s3_key = os.path.join(self.metadata.get("s3_key", ""), voterset_filename)
file_data_stream = self.voterset_s3_connection.get_object(self.voterset_s3_bucket,
voterset_filename_s3_key)
except Exception as ex:
raise Exception(f"universe_map_file.process_file: Failed to get {self.voterset_s3_bucket}/{voterset_filename_s3_key}: {ex}") from ex
rows = read_voterdata_csv_stream(file_data_stream, csv_keys_needed)
logging.debug("rows = %s", len(rows))
# Update incoming stats after reading this file map
self.stats["incoming_count"][voterset_filename] = len(rows)
self.stats["incoming_count"]["total"] += self.stats["incoming_count"][voterset_filename]