This commit is contained in:
2024-02-08 22:18:18 +01:00
parent 398738f8db
commit 809e20cb80
4 changed files with 169 additions and 60 deletions

View File

@@ -0,0 +1,29 @@
import csv
from pathlib import Path
import logging
from BudaportaFvmConvert.Reading import Reading
REQUIRED_CSV_COLUMNS = ['Notes', 'Meter n°', 'Reading', 'Reading data']
class CsvFile:
logger = logging.getLogger(__name__)
def __init__(self, filepath: Path) -> None:
self.header_numbers = {}
self.readings = []
with open(filepath, newline='', encoding="cp1252") as csvfile:
incsv = list(csv.reader(csvfile, delimiter=';'))
header_row = incsv.pop(0)
for i, col_name in enumerate(header_row):
if col_name in REQUIRED_CSV_COLUMNS:
self.header_numbers[col_name] = i
self.logger.debug(f"header numbers: {self.header_numbers}")
for csv_row in incsv:
self.readings.append(Reading(csv_row, self.header_numbers))
def get_readings(self) -> list[Reading]:
return self.readings

View File

@@ -0,0 +1,51 @@
import logging
import datetime
class Reading:
logger = logging.getLogger(__name__)
def __init__(self, csv_row: list, header_numbers: dict) -> None:
self.logger.debug(f"init Reading, row: {csv_row}")
self.data_dict = {
"KeszulekAzon": "",
"Gyariszam": "",
"MeroAllas": "",
"MertekEgyseg": "m3",
"LeoMod": "20",
"LeoMegjegyzes": "06",
"TenyLeoDatum": "",
"TenyLeoIdo": ""
}
self.valid = True
for col_name, col_num in header_numbers.items():
try:
if col_name == "Notes":
self.data_dict["KeszulekAzon"] = csv_row[col_num]
elif col_name == "Meter n°":
self.data_dict["Gyariszam"] = csv_row[col_num]
elif col_name == "Reading":
self.data_dict["MeroAllas"] = csv_row[col_num]\
.replace(',', '.').replace(' ', '')
elif col_name == "Reading data":
reading_date = datetime.datetime.strptime(
csv_row[col_num], '%d. %m. %Y %H:%M:%S')
self.data_dict["TenyLeoDatum"] = reading_date.strftime(
'%Y%m%d')
self.data_dict["TenyLeoIdo"] = reading_date.strftime('%H%M%S')
except Exception as e:
self.valid = False
self.logger.warn(f"Malformed Reading column: {col_name}: {csv_row[col_num]}")
self.logger.debug(f"Exception: {str(e)}")
if not self.valid:
self.logger.info(f"Invalid reading found: {self.data_dict}")
def is_valid(self) -> bool:
return self.valid

View File

@@ -0,0 +1,62 @@
import logging
import datetime
from pathlib import Path
import xml.etree.ElementTree as ET
import xmlformatter
from BudaportaFvmConvert.Reading import Reading
# format: date, number
FILENAMETEMPLATE = "SZLA_{}_{}.XML"
class XmlFile:
logger = logging.getLogger(__name__)
def __init__(self, file_date: datetime.datetime, file_dir: Path) -> None:
file_date_str = file_date.strftime('%Y%m%d%H%M%S')
file_num = 1
self.file_path = None
while not self.file_path:
file_name = FILENAMETEMPLATE.format(
file_date_str, str(file_num).zfill(2))
file_path = file_dir.joinpath(file_name)
if file_path.exists():
file_num = file_num+1
else:
self.file_path = file_path
if not self.file_path:
raise Exception("File name not found!")
self.logger.debug(f"File name found: {self.file_path}")
# Init xml root:
self.xml_root = ET.Element('Leolvasasok')
def add_reading(self, reading: Reading) -> None:
current_subelement = ET.SubElement(self.xml_root, 'Leolvasas')
for data_key, data in reading.data_dict.items():
a = ET.SubElement(current_subelement, data_key)
a.text = data
def get_xml_string(self) -> bytes:
xml_string = ET.tostring(
self.xml_root, encoding="utf-8", xml_declaration=True)
formatter = xmlformatter.Formatter(indent=2, indent_char=" ")
formatted_string_bytes = formatter.format_string(xml_string)
# Fix line ending:
if not b'\r\n' in formatted_string_bytes:
formatted_string_bytes = \
formatted_string_bytes.replace(b'\n', b'\r\n')
return formatted_string_bytes
def save_xml_file(self) -> None:
if self.file_path:
with open(self.file_path, mode="xb") as f:
f.write(self.get_xml_string())

View File

@@ -3,10 +3,12 @@
import argparse import argparse
from importlib.metadata import metadata from importlib.metadata import metadata
from pathlib import Path from pathlib import Path
import csv
import datetime import datetime
import logging
import sys
REQUIRED_CSV_COLUMNS = ['Notes', 'Meter n°', 'Reading', 'Reading data'] from BudaportaFvmConvert.XmlFile import XmlFile
from BudaportaFvmConvert.CsvFile import CsvFile
METADATA = metadata("BudaportaFvmConvert") METADATA = metadata("BudaportaFvmConvert")
@@ -26,72 +28,37 @@ def run():
parser.add_argument("source", parser.add_argument("source",
type=Path, type=Path,
help="Path to a csv file or a folder", help="Path to a csv file or a folder"
nargs="+"
) )
args = parser.parse_args() args = parser.parse_args()
# Start logging:
logging.basicConfig(level=logging.INFO)
logging.debug(f"Arguments: {vars(args)}")
csvpaths = [] csvpaths = []
output_dir = Path()
if args.source.is_dir():
csvpaths.extend(args.source.glob('**/*.csv'))
output_dir = args.source
elif args.source.suffix == ".csv":
csvpaths.append(args.source)
output_dir = args.source.parent
if not csvpaths:
sys.exit("No csv file found!")
# Get Readings from csvfiles:
readings = [] readings = []
for path in args.source:
if path.is_dir():
csvpaths.extend(get_csv_paths(path))
elif path.suffix == ".csv":
csvpaths.append(path)
for csvpath in csvpaths: for csvpath in csvpaths:
readings.extend(CsvFile(csvpath).get_readings()) readings.extend(CsvFile(csvpath).get_readings())
xml_file = XmlFile(datetime.datetime.now(), output_dir)
def get_csv_paths(dp: Path): for reading in readings:
paths = [] if reading.is_valid():
if dp.is_dir(): xml_file.add_reading(reading)
for p in dp.iterdir():
if p.suffix == ".csv":
paths.append(p)
return paths
xml_file.save_xml_file()
class CsvFile:
def __init__(self, filepath: Path) -> None:
self.header_numbers = {}
self.readings = []
with open(filepath, newline='', encoding="cp1252") as csvfile:
incsv = list(csv.reader(csvfile, delimiter=';'))
header_row = incsv.pop(0)
for i, col_name in enumerate(header_row):
if col_name in REQUIRED_CSV_COLUMNS:
self.header_numbers[col_name] = i
for csv_row in incsv:
self.readings.append(Reading(csv_row, self.header_numbers))
def get_readings(self) -> list["Reading"]:
return self.readings
class Reading:
def __init__(self, csv_row: list, header_numbers: dict) -> None:
self.data_dict = {
"KeszulekAzon": "",
"Gyariszam": "",
"MeroAllas": "",
"MertekEgyseg": "m3",
"LeoMod": "20",
"LeoMegjegyzes": "06",
"TenyLeoDatum": "",
"TenyLeoIdo": ""
}
for col_name, col_num in header_numbers.items():
if col_name == "Notes":
self.data_dict["KeszulekAzon"] = csv_row[col_num]
elif col_name == "Meter n°":
self.data_dict["Gyariszam"] = csv_row[col_num]
elif col_name == "Reading":
self.data_dict["MeroAllas"] = csv_row[col_num]\
.replace(',', '.').replace(' ', '')
elif col_name == "Reading data":
reading_date = datetime.datetime.strptime(
csv_row[col_num], '%d. %m. %Y %H:%M:%S')
self.data_dict["TenyLeoDatum"] = reading_date.strftime('%Y%m%d')
self.data_dict["TenyLeoIdo"] = reading_date.strftime('%H%M%S')