lib-af-mosaic/lib_afc_mosiac/universe_map_file.py

123 lines
2.9 KiB
Python

"""
AFCMUniverseMapFile Class object
"""
import json
class AFCMUniverseMapFile:
"""
AFCMUniverseMapFile class
"""
def __init__( self, mosaic_filename=None ):
"""
__init__
:param mosaic_filename:
"""
self.mosaic_filename = mosaic_filename
self.mosaic_head = {
"filetype": "universe map file",
"version": '0.10'
}
self.metadata = {
"mosaic_filename": "TeirabDelegateTextFile_MOSIAC.csv",
"mosaic_file_url": None,
"tags": None,
"total_rows": 0,
"total_removed": 0,
"total_optout": 0,
"processing_time": 0
}
self.mosaic_file_list = {}
self.files = []
def read_mosaic_map_file( self, filename ):
"""
read_mosaic_map_file
"""
if filename == '' or filename is None:
return None
with open(filename, encoding='utf-8-sig') as json_file:
data = json.load(json_file)
#print("Type:", type(data))
if 'mosaic' in data:
self.mosaic_head = data['mosaic']
if 'metadata' in data:
self.metadata = data['metadata']
if 'files' in data:
self.mosaic_file_list = data['files']
return data
def write_mosaic_map_file( self, filename ):
"""
write_mosaic_map_file
"""
if filename == '' or filename is None:
return None
output_object = json.dumps({"mosaic": self.mosaic_head,
"metadata": self.metadata,
"files": self.mosaic_file_list}, indent=4)
with open(filename, 'w', encoding="UTF-8") as outfile:
outfile.write(output_object)
return None
def create_new_input_file( self, filename=None ):
"""
create_new_input_file
"""
mosaic_input_file = {
"filename": filename,
"details": {
"csv_get_url": None,
"csv_get_prepped_url": None,
"csv_get_removed_url": None,
"csv_put_url": None,
"mapping": {
"UID": None,
"FName": None,
"LName": None,
"Cell_Phone": None,
"Party": None,
"State": None,
"Precinct": None,
"County": None,
"URL": None
},
"operations": {
"insert": {"field":"*", "operator":"=", "value":"*"}
}
}
}
return mosaic_input_file
def add_new_input_file( self, ifo ):
"""
add_new_input_file
"""
if ifo is None:
return
self.files.append(ifo)