diff --git a/BudaportaFvmConvert/CsvFile.py b/BudaportaFvmConvert/CsvFile.py new file mode 100644 index 0000000..7ae50ee --- /dev/null +++ b/BudaportaFvmConvert/CsvFile.py @@ -0,0 +1,29 @@ +import csv +from pathlib import Path +import logging + +from BudaportaFvmConvert.Reading import Reading + + +REQUIRED_CSV_COLUMNS = ['Notes', 'Meter n°', 'Reading', 'Reading data'] + + +class CsvFile: + logger = logging.getLogger(__name__) + + def __init__(self, filepath: Path) -> None: + + self.header_numbers = {} + self.readings = [] + with open(filepath, newline='', encoding="cp1252") as csvfile: + incsv = list(csv.reader(csvfile, delimiter=';')) + header_row = incsv.pop(0) + for i, col_name in enumerate(header_row): + if col_name in REQUIRED_CSV_COLUMNS: + self.header_numbers[col_name] = i + self.logger.debug(f"header numbers: {self.header_numbers}") + for csv_row in incsv: + self.readings.append(Reading(csv_row, self.header_numbers)) + + def get_readings(self) -> list[Reading]: + return self.readings diff --git a/BudaportaFvmConvert/Reading.py b/BudaportaFvmConvert/Reading.py new file mode 100644 index 0000000..77bfeeb --- /dev/null +++ b/BudaportaFvmConvert/Reading.py @@ -0,0 +1,51 @@ +import logging +import datetime + + + +class Reading: + logger = logging.getLogger(__name__) + + def __init__(self, csv_row: list, header_numbers: dict) -> None: + self.logger.debug(f"init Reading, row: {csv_row}") + self.data_dict = { + "KeszulekAzon": "", + "Gyariszam": "", + "MeroAllas": "", + "MertekEgyseg": "m3", + "LeoMod": "20", + "LeoMegjegyzes": "06", + "TenyLeoDatum": "", + "TenyLeoIdo": "" + } + self.valid = True + + + for col_name, col_num in header_numbers.items(): + try: + if col_name == "Notes": + self.data_dict["KeszulekAzon"] = csv_row[col_num] + elif col_name == "Meter n°": + self.data_dict["Gyariszam"] = csv_row[col_num] + elif col_name == "Reading": + self.data_dict["MeroAllas"] = csv_row[col_num]\ + .replace(',', '.').replace(' ', '') + elif col_name == "Reading data": + reading_date = datetime.datetime.strptime( + csv_row[col_num], '%d. %m. %Y %H:%M:%S') + self.data_dict["TenyLeoDatum"] = reading_date.strftime( + '%Y%m%d') + self.data_dict["TenyLeoIdo"] = reading_date.strftime('%H%M%S') + + except Exception as e: + self.valid = False + self.logger.warn(f"Malformed Reading column: {col_name}: {csv_row[col_num]}") + self.logger.debug(f"Exception: {str(e)}") + + if not self.valid: + self.logger.info(f"Invalid reading found: {self.data_dict}") + + def is_valid(self) -> bool: + return self.valid + + diff --git a/BudaportaFvmConvert/XmlFile.py b/BudaportaFvmConvert/XmlFile.py new file mode 100644 index 0000000..85adf57 --- /dev/null +++ b/BudaportaFvmConvert/XmlFile.py @@ -0,0 +1,62 @@ +import logging +import datetime +from pathlib import Path +import xml.etree.ElementTree as ET +import xmlformatter + + +from BudaportaFvmConvert.Reading import Reading + +# format: date, number +FILENAMETEMPLATE = "SZLA_{}_{}.XML" + + +class XmlFile: + logger = logging.getLogger(__name__) + + def __init__(self, file_date: datetime.datetime, file_dir: Path) -> None: + + file_date_str = file_date.strftime('%Y%m%d%H%M%S') + file_num = 1 + self.file_path = None + + while not self.file_path: + file_name = FILENAMETEMPLATE.format( + file_date_str, str(file_num).zfill(2)) + file_path = file_dir.joinpath(file_name) + if file_path.exists(): + file_num = file_num+1 + else: + self.file_path = file_path + + if not self.file_path: + raise Exception("File name not found!") + + self.logger.debug(f"File name found: {self.file_path}") + + # Init xml root: + self.xml_root = ET.Element('Leolvasasok') + + def add_reading(self, reading: Reading) -> None: + current_subelement = ET.SubElement(self.xml_root, 'Leolvasas') + for data_key, data in reading.data_dict.items(): + a = ET.SubElement(current_subelement, data_key) + a.text = data + + def get_xml_string(self) -> bytes: + xml_string = ET.tostring( + self.xml_root, encoding="utf-8", xml_declaration=True) + formatter = xmlformatter.Formatter(indent=2, indent_char=" ") + formatted_string_bytes = formatter.format_string(xml_string) + + # Fix line ending: + if not b'\r\n' in formatted_string_bytes: + formatted_string_bytes = \ + formatted_string_bytes.replace(b'\n', b'\r\n') + + return formatted_string_bytes + + def save_xml_file(self) -> None: + if self.file_path: + with open(self.file_path, mode="xb") as f: + f.write(self.get_xml_string()) diff --git a/BudaportaFvmConvert/__init__.py b/BudaportaFvmConvert/__init__.py index 71b2d12..ddd7f49 100644 --- a/BudaportaFvmConvert/__init__.py +++ b/BudaportaFvmConvert/__init__.py @@ -3,10 +3,12 @@ import argparse from importlib.metadata import metadata from pathlib import Path -import csv import datetime +import logging +import sys -REQUIRED_CSV_COLUMNS = ['Notes', 'Meter n°', 'Reading', 'Reading data'] +from BudaportaFvmConvert.XmlFile import XmlFile +from BudaportaFvmConvert.CsvFile import CsvFile METADATA = metadata("BudaportaFvmConvert") @@ -26,72 +28,37 @@ def run(): parser.add_argument("source", type=Path, - help="Path to a csv file or a folder", - nargs="+" + help="Path to a csv file or a folder" ) args = parser.parse_args() + # Start logging: + logging.basicConfig(level=logging.INFO) + logging.debug(f"Arguments: {vars(args)}") + csvpaths = [] + output_dir = Path() + + if args.source.is_dir(): + csvpaths.extend(args.source.glob('**/*.csv')) + output_dir = args.source + elif args.source.suffix == ".csv": + csvpaths.append(args.source) + output_dir = args.source.parent + + if not csvpaths: + sys.exit("No csv file found!") + + # Get Readings from csvfiles: readings = [] - - for path in args.source: - if path.is_dir(): - csvpaths.extend(get_csv_paths(path)) - elif path.suffix == ".csv": - csvpaths.append(path) - for csvpath in csvpaths: readings.extend(CsvFile(csvpath).get_readings()) + xml_file = XmlFile(datetime.datetime.now(), output_dir) -def get_csv_paths(dp: Path): - paths = [] - if dp.is_dir(): - for p in dp.iterdir(): - if p.suffix == ".csv": - paths.append(p) - return paths + for reading in readings: + if reading.is_valid(): + xml_file.add_reading(reading) - -class CsvFile: - def __init__(self, filepath: Path) -> None: - self.header_numbers = {} - self.readings = [] - with open(filepath, newline='', encoding="cp1252") as csvfile: - incsv = list(csv.reader(csvfile, delimiter=';')) - header_row = incsv.pop(0) - for i, col_name in enumerate(header_row): - if col_name in REQUIRED_CSV_COLUMNS: - self.header_numbers[col_name] = i - for csv_row in incsv: - self.readings.append(Reading(csv_row, self.header_numbers)) - - def get_readings(self) -> list["Reading"]: - return self.readings - -class Reading: - def __init__(self, csv_row: list, header_numbers: dict) -> None: - self.data_dict = { - "KeszulekAzon": "", - "Gyariszam": "", - "MeroAllas": "", - "MertekEgyseg": "m3", - "LeoMod": "20", - "LeoMegjegyzes": "06", - "TenyLeoDatum": "", - "TenyLeoIdo": "" - } - for col_name, col_num in header_numbers.items(): - if col_name == "Notes": - self.data_dict["KeszulekAzon"] = csv_row[col_num] - elif col_name == "Meter n°": - self.data_dict["Gyariszam"] = csv_row[col_num] - elif col_name == "Reading": - self.data_dict["MeroAllas"] = csv_row[col_num]\ - .replace(',', '.').replace(' ', '') - elif col_name == "Reading data": - reading_date = datetime.datetime.strptime( - csv_row[col_num], '%d. %m. %Y %H:%M:%S') - self.data_dict["TenyLeoDatum"] = reading_date.strftime('%Y%m%d') - self.data_dict["TenyLeoIdo"] = reading_date.strftime('%H%M%S') + xml_file.save_xml_file()