46 lines
1.1 KiB
Python
46 lines
1.1 KiB
Python
"""
|
|
Helper functions for the Mosaic library
|
|
"""
|
|
|
|
import csv
|
|
import codecs
|
|
|
|
def read_voterdata_csv_stream(input_stream, keys_needed=None) -> list:
|
|
"""
|
|
read_voterdata_csv_stream: Read csv stream to list
|
|
|
|
:param input_stream: Input stream to read from
|
|
|
|
:return List of parsed csv rows
|
|
"""
|
|
|
|
rows = []
|
|
# note: using 'utf-8-sig' removes the annoying \ufeff BOM signature
|
|
if keys_needed is None or len(keys_needed) > 0:
|
|
key_list = keys_needed
|
|
|
|
reader = csv.DictReader(codecs.getreader("utf-8-sig")(input_stream))
|
|
for row in reader:
|
|
if key_list is None:
|
|
key_list = row.keys()
|
|
new_row = {}
|
|
for k in key_list:
|
|
new_row[k] = row[k]
|
|
rows.append(new_row)
|
|
|
|
return rows
|
|
|
|
|
|
def write_csv_file(full_file_pathname, fieldnames, rows):
|
|
"""
|
|
write_csv_file
|
|
|
|
:param full_file_pathname:
|
|
:param fieldnames:
|
|
"""
|
|
with open(full_file_pathname, 'w', newline='', encoding='UTF-8') as csvfile:
|
|
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
for row in rows:
|
|
writer.writerow(row)
|