Compare commits
7 Commits
4a6d34da4d
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 809e20cb80 | |||
| 398738f8db | |||
| cb82eed2f0 | |||
| 512cb17264 | |||
| fb6ca1da51 | |||
| 141414e2b4 | |||
| 0209741dcf |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,2 +1,4 @@
|
||||
.venv
|
||||
*.egg-info
|
||||
__pycache__
|
||||
example
|
||||
example-csv
|
||||
|
||||
@@ -11,6 +11,7 @@ import xml.etree.ElementTree as ET
|
||||
import csv
|
||||
import datetime
|
||||
import os
|
||||
import xmlformatter
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------- #
|
||||
@@ -70,11 +71,11 @@ def getXmlData(data):
|
||||
|
||||
def checkData(xmlData):
|
||||
print()
|
||||
missingData = False
|
||||
for mérő in xmlData:
|
||||
missingData = []
|
||||
for i, mérő in enumerate(xmlData):
|
||||
for adat in mérő:
|
||||
if len(mérő[adat]) == 0:
|
||||
missingData = True
|
||||
if len(mérő[adat]) == 0 or mérő[adat] == "diktalos":
|
||||
missingData.append(i)
|
||||
print(f'Adat hiányzik: "{adat}" a következő sorból:')
|
||||
print(mérő)
|
||||
return missingData
|
||||
@@ -100,20 +101,24 @@ csvFileNames = []
|
||||
if os.path.isdir(csvPath):
|
||||
folder = csvPath
|
||||
print(f'Fájlok a mappában: {os.listdir(csvPath)}')
|
||||
for i, file in enumerate(os.listdir(csvPath)):
|
||||
files = os.listdir(csvPath)
|
||||
for file in os.listdir(csvPath):
|
||||
if not os.path.splitext(file)[1] == '.csv':
|
||||
files.remove(file)
|
||||
print(f'Csv fájlok a mappában: {files}')
|
||||
for i, file in enumerate(files):
|
||||
csvFileNames.append(file)
|
||||
with open(os.path.join(csvPath, file), newline='') as csvfile:
|
||||
incsv = csv.reader(csvfile, delimiter=';')
|
||||
csvArrs.append([])
|
||||
|
||||
for row in incsv:
|
||||
csvArrs[i].append(row)
|
||||
|
||||
else:
|
||||
csvFileNames[0] = csvPath
|
||||
csvFileNames.append(csvPath)
|
||||
with open(csvPath, newline='') as csvfile:
|
||||
incsv = csv.reader(csvfile, delimiter=';')
|
||||
csvArrs[0] = []
|
||||
csvArrs.append([])
|
||||
for row in incsv:
|
||||
csvArrs[0].append(row)
|
||||
|
||||
@@ -143,16 +148,22 @@ for i, table in enumerate(csvArrs):
|
||||
xmlReqData = []
|
||||
for i, table in enumerate(reqData):
|
||||
print(f'Fájl ellenőrzése: {csvFileNames[i]}')
|
||||
|
||||
# unwrap data
|
||||
currXmlData = getXmlData(table)
|
||||
if not checkData(currXmlData):
|
||||
|
||||
# check for missing data
|
||||
wrongLines = checkData(currXmlData)
|
||||
if len(wrongLines) == 0:
|
||||
print('Fájl hibátlan')
|
||||
else:
|
||||
# remove bad lines
|
||||
print(wrongLines)
|
||||
for ele in sorted(wrongLines, reverse=True):
|
||||
del currXmlData[ele]
|
||||
print()
|
||||
xmlReqData.append(currXmlData)
|
||||
|
||||
# print(xmlReqData)
|
||||
# for i in xmlReqData:
|
||||
# print(i)
|
||||
|
||||
|
||||
# ------------------------------- Merge arrays ------------------------------- #
|
||||
|
||||
@@ -178,7 +189,6 @@ while fileNameNotFound:
|
||||
if not os.path.exists(fileName):
|
||||
fileNameNotFound = False
|
||||
|
||||
|
||||
print(f'Fájl mentése ide: {fileName}')
|
||||
print()
|
||||
|
||||
@@ -191,8 +201,24 @@ for mérő in allXmlData:
|
||||
a = ET.SubElement(currSub, adat)
|
||||
a.text = mérő[adat]
|
||||
|
||||
print(ET.dump(data))
|
||||
print()
|
||||
xmlString = ET.tostring(data, encoding="utf-8", xml_declaration=True)
|
||||
|
||||
tree = ET.ElementTree(data)
|
||||
tree.write(fileName)
|
||||
# -------------------------------- Format xml -------------------------------- #
|
||||
|
||||
formatter = xmlformatter.Formatter(indent=2, indent_char=" ")
|
||||
formattedXml = formatter.format_string(xmlString)
|
||||
|
||||
# print(formattedXml)
|
||||
|
||||
# ----------------------------- Fix line endings ----------------------------- #
|
||||
|
||||
WINDOWS_LINE_ENDING = b'\r\n'
|
||||
UNIX_LINE_ENDING = b'\n'
|
||||
|
||||
formattedXml = formattedXml.replace(UNIX_LINE_ENDING, WINDOWS_LINE_ENDING)
|
||||
|
||||
# --------------------------------- Save file -------------------------------- #
|
||||
|
||||
f = open(fileName, "xb")
|
||||
f.write(formattedXml)
|
||||
f.close
|
||||
|
||||
29
BudaportaFvmConvert/CsvFile.py
Normal file
29
BudaportaFvmConvert/CsvFile.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import csv
|
||||
from pathlib import Path
|
||||
import logging
|
||||
|
||||
from BudaportaFvmConvert.Reading import Reading
|
||||
|
||||
|
||||
REQUIRED_CSV_COLUMNS = ['Notes', 'Meter n°', 'Reading', 'Reading data']
|
||||
|
||||
|
||||
class CsvFile:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, filepath: Path) -> None:
|
||||
|
||||
self.header_numbers = {}
|
||||
self.readings = []
|
||||
with open(filepath, newline='', encoding="cp1252") as csvfile:
|
||||
incsv = list(csv.reader(csvfile, delimiter=';'))
|
||||
header_row = incsv.pop(0)
|
||||
for i, col_name in enumerate(header_row):
|
||||
if col_name in REQUIRED_CSV_COLUMNS:
|
||||
self.header_numbers[col_name] = i
|
||||
self.logger.debug(f"header numbers: {self.header_numbers}")
|
||||
for csv_row in incsv:
|
||||
self.readings.append(Reading(csv_row, self.header_numbers))
|
||||
|
||||
def get_readings(self) -> list[Reading]:
|
||||
return self.readings
|
||||
51
BudaportaFvmConvert/Reading.py
Normal file
51
BudaportaFvmConvert/Reading.py
Normal file
@@ -0,0 +1,51 @@
|
||||
import logging
|
||||
import datetime
|
||||
|
||||
|
||||
|
||||
class Reading:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, csv_row: list, header_numbers: dict) -> None:
|
||||
self.logger.debug(f"init Reading, row: {csv_row}")
|
||||
self.data_dict = {
|
||||
"KeszulekAzon": "",
|
||||
"Gyariszam": "",
|
||||
"MeroAllas": "",
|
||||
"MertekEgyseg": "m3",
|
||||
"LeoMod": "20",
|
||||
"LeoMegjegyzes": "06",
|
||||
"TenyLeoDatum": "",
|
||||
"TenyLeoIdo": ""
|
||||
}
|
||||
self.valid = True
|
||||
|
||||
|
||||
for col_name, col_num in header_numbers.items():
|
||||
try:
|
||||
if col_name == "Notes":
|
||||
self.data_dict["KeszulekAzon"] = csv_row[col_num]
|
||||
elif col_name == "Meter n°":
|
||||
self.data_dict["Gyariszam"] = csv_row[col_num]
|
||||
elif col_name == "Reading":
|
||||
self.data_dict["MeroAllas"] = csv_row[col_num]\
|
||||
.replace(',', '.').replace(' ', '')
|
||||
elif col_name == "Reading data":
|
||||
reading_date = datetime.datetime.strptime(
|
||||
csv_row[col_num], '%d. %m. %Y %H:%M:%S')
|
||||
self.data_dict["TenyLeoDatum"] = reading_date.strftime(
|
||||
'%Y%m%d')
|
||||
self.data_dict["TenyLeoIdo"] = reading_date.strftime('%H%M%S')
|
||||
|
||||
except Exception as e:
|
||||
self.valid = False
|
||||
self.logger.warn(f"Malformed Reading column: {col_name}: {csv_row[col_num]}")
|
||||
self.logger.debug(f"Exception: {str(e)}")
|
||||
|
||||
if not self.valid:
|
||||
self.logger.info(f"Invalid reading found: {self.data_dict}")
|
||||
|
||||
def is_valid(self) -> bool:
|
||||
return self.valid
|
||||
|
||||
|
||||
62
BudaportaFvmConvert/XmlFile.py
Normal file
62
BudaportaFvmConvert/XmlFile.py
Normal file
@@ -0,0 +1,62 @@
|
||||
import logging
|
||||
import datetime
|
||||
from pathlib import Path
|
||||
import xml.etree.ElementTree as ET
|
||||
import xmlformatter
|
||||
|
||||
|
||||
from BudaportaFvmConvert.Reading import Reading
|
||||
|
||||
# format: date, number
|
||||
FILENAMETEMPLATE = "SZLA_{}_{}.XML"
|
||||
|
||||
|
||||
class XmlFile:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def __init__(self, file_date: datetime.datetime, file_dir: Path) -> None:
|
||||
|
||||
file_date_str = file_date.strftime('%Y%m%d%H%M%S')
|
||||
file_num = 1
|
||||
self.file_path = None
|
||||
|
||||
while not self.file_path:
|
||||
file_name = FILENAMETEMPLATE.format(
|
||||
file_date_str, str(file_num).zfill(2))
|
||||
file_path = file_dir.joinpath(file_name)
|
||||
if file_path.exists():
|
||||
file_num = file_num+1
|
||||
else:
|
||||
self.file_path = file_path
|
||||
|
||||
if not self.file_path:
|
||||
raise Exception("File name not found!")
|
||||
|
||||
self.logger.debug(f"File name found: {self.file_path}")
|
||||
|
||||
# Init xml root:
|
||||
self.xml_root = ET.Element('Leolvasasok')
|
||||
|
||||
def add_reading(self, reading: Reading) -> None:
|
||||
current_subelement = ET.SubElement(self.xml_root, 'Leolvasas')
|
||||
for data_key, data in reading.data_dict.items():
|
||||
a = ET.SubElement(current_subelement, data_key)
|
||||
a.text = data
|
||||
|
||||
def get_xml_string(self) -> bytes:
|
||||
xml_string = ET.tostring(
|
||||
self.xml_root, encoding="utf-8", xml_declaration=True)
|
||||
formatter = xmlformatter.Formatter(indent=2, indent_char=" ")
|
||||
formatted_string_bytes = formatter.format_string(xml_string)
|
||||
|
||||
# Fix line ending:
|
||||
if not b'\r\n' in formatted_string_bytes:
|
||||
formatted_string_bytes = \
|
||||
formatted_string_bytes.replace(b'\n', b'\r\n')
|
||||
|
||||
return formatted_string_bytes
|
||||
|
||||
def save_xml_file(self) -> None:
|
||||
if self.file_path:
|
||||
with open(self.file_path, mode="xb") as f:
|
||||
f.write(self.get_xml_string())
|
||||
64
BudaportaFvmConvert/__init__.py
Normal file
64
BudaportaFvmConvert/__init__.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# import xml.etree.ElementTree as ET
|
||||
# import xmlformatter
|
||||
import argparse
|
||||
from importlib.metadata import metadata
|
||||
from pathlib import Path
|
||||
import datetime
|
||||
import logging
|
||||
import sys
|
||||
|
||||
from BudaportaFvmConvert.XmlFile import XmlFile
|
||||
from BudaportaFvmConvert.CsvFile import CsvFile
|
||||
|
||||
|
||||
METADATA = metadata("BudaportaFvmConvert")
|
||||
__version__ = METADATA["Version"]
|
||||
|
||||
|
||||
def run():
|
||||
parser = argparse.ArgumentParser(
|
||||
prog=METADATA["Name"],
|
||||
description=METADATA["Summary"],
|
||||
)
|
||||
|
||||
parser.add_argument("-v", "--version",
|
||||
action="version",
|
||||
version=f'{METADATA["Name"]} {METADATA["Version"]}'
|
||||
)
|
||||
|
||||
parser.add_argument("source",
|
||||
type=Path,
|
||||
help="Path to a csv file or a folder"
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Start logging:
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.debug(f"Arguments: {vars(args)}")
|
||||
|
||||
csvpaths = []
|
||||
output_dir = Path()
|
||||
|
||||
if args.source.is_dir():
|
||||
csvpaths.extend(args.source.glob('**/*.csv'))
|
||||
output_dir = args.source
|
||||
elif args.source.suffix == ".csv":
|
||||
csvpaths.append(args.source)
|
||||
output_dir = args.source.parent
|
||||
|
||||
if not csvpaths:
|
||||
sys.exit("No csv file found!")
|
||||
|
||||
# Get Readings from csvfiles:
|
||||
readings = []
|
||||
for csvpath in csvpaths:
|
||||
readings.extend(CsvFile(csvpath).get_readings())
|
||||
|
||||
xml_file = XmlFile(datetime.datetime.now(), output_dir)
|
||||
|
||||
for reading in readings:
|
||||
if reading.is_valid():
|
||||
xml_file.add_reading(reading)
|
||||
|
||||
xml_file.save_xml_file()
|
||||
4
BudaportaFvmConvert/__main__.py
Normal file
4
BudaportaFvmConvert/__main__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from BudaportaFvmConvert import run
|
||||
|
||||
if __name__ == "__main__":
|
||||
run()
|
||||
16
README.md
16
README.md
@@ -2,6 +2,16 @@
|
||||
|
||||
Vízműveknek megfelelő xml konvertálása leolvasó programból kijövő csv fájlból.
|
||||
|
||||
## Telepítés
|
||||
|
||||
### Python-venv virtuális környezetben
|
||||
|
||||
```shell
|
||||
python -m venv .venv
|
||||
. ./.venv/bin/activate
|
||||
pip install -e .
|
||||
```
|
||||
|
||||
## Adatok megfeleltetése
|
||||
|
||||
| Fix érték | csv | xml |
|
||||
@@ -26,4 +36,10 @@ Egy xml fájl neve hossza legfeljebb 40 karakter lehet!
|
||||
A file nevében ne legyen ékezetes karakter.
|
||||
Egy sor sem lehet 50 karakternél hosszabb!
|
||||
|
||||
## Példa
|
||||
|
||||
Szerveren itt, linux útvonal:
|
||||
|
||||
```
|
||||
smb://bdc1.boffice.local/porta/Társasházkezelés/DOKUMENTUMTÁR/_VÍZ_ELSZÁMOLÁS_/ZSOLT%20UDVAR/Gépi%20kiolvasás%202020.12.03.tól/
|
||||
```
|
||||
33
pyproject.toml
Normal file
33
pyproject.toml
Normal file
@@ -0,0 +1,33 @@
|
||||
[project]
|
||||
name = "BudaportaFvmConvert"
|
||||
version = "2.0.0"
|
||||
description = "Convert meter readings from CSV to XML"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
license = {file = "LICENSE"}
|
||||
authors = [
|
||||
{name = "infeeeee", email = "gyetpet@mailbox.org"}
|
||||
]
|
||||
maintainers = [
|
||||
{name = "infeeeee", email = "gyetpet@mailbox.org"}
|
||||
]
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
|
||||
dependencies = [
|
||||
"xmlformatter"
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
homepage = "https://git.gyetvaipeter.hu/infeeeee/BudaportaFvmConvert"
|
||||
documentation = "https://git.gyetvaipeter.hu/infeeeee/BudaportaFvmConvert"
|
||||
repository = "https://git.gyetvaipeter.hu/infeeeee/BudaportaFvmConvert"
|
||||
changelog = "https://git.gyetvaipeter.hu/infeeeee/BudaportaFvmConvert"
|
||||
|
||||
[project.scripts]
|
||||
BudaportaFvmConvert = "BudaportaFvmConvert:run"
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools", "wheel"]
|
||||
Reference in New Issue
Block a user