206 lines
5.9 KiB
Python
206 lines
5.9 KiB
Python
# BudaPortaXmlConvert
|
|
# Version: 1.1
|
|
# License: GNU LGPLv3
|
|
# Maintainer: Peter Gyetvai - gyetpet@mailbox.org
|
|
# Repo: https://git.gyetvaipeter.hu/infeeeee/budaporta-xml
|
|
|
|
# ---------------------------------- import ---------------------------------- #
|
|
|
|
import sys
|
|
import xml.etree.ElementTree as ET
|
|
import csv
|
|
import datetime
|
|
import os
|
|
|
|
|
|
# ---------------------------------------------------------------------------- #
|
|
# Function #
|
|
# ---------------------------------------------------------------------------- #
|
|
|
|
# Finds the number of column
|
|
def findColumn(text, array):
|
|
for i, elem in enumerate(array):
|
|
if elem == text:
|
|
return i
|
|
raise ValueError('Oszlop nem található!' + text)
|
|
|
|
# Finds all column numbers, returns an array of positions
|
|
|
|
|
|
def findColumnNumbers(csvHeaderArray, requiredHeaders):
|
|
colNumbers = []
|
|
for col in requiredHeaders:
|
|
colnum = findColumn(col, csvHeaderArray)
|
|
print(f'"{col}" oszlop sorszáma: {colnum}')
|
|
colNumbers.append(colnum)
|
|
print()
|
|
return colNumbers
|
|
|
|
# Gets the data from all rows, based on column numbers:
|
|
|
|
|
|
def getReqData(csvRows, colNumbers):
|
|
outData = []
|
|
for row in csvRows:
|
|
dict1 = {}
|
|
for i, reqCol in enumerate(colNumbers):
|
|
dict1[requiredData[i]] = row[reqCol]
|
|
outData.append(dict1)
|
|
return outData
|
|
|
|
|
|
def getXmlData(data):
|
|
xmlData = []
|
|
for dict in data:
|
|
theDate = datetime.datetime.strptime(
|
|
dict["Reading data"], '%d. %m. %Y %H:%M:%S')
|
|
xmlDict = {
|
|
"KeszulekAzon": dict["Notes"],
|
|
"Gyariszam": dict["Meter n°"],
|
|
"MeroAllas": dict["Reading"].replace(',', '.').replace(' ', ''),
|
|
"MertekEgyseg": "m3",
|
|
"LeoMod": "20",
|
|
"LeoMegjegyzes": "06",
|
|
"TenyLeoDatum": theDate.strftime('%Y%m%d'),
|
|
"TenyLeoIdo": theDate.strftime('%H%M%S')
|
|
}
|
|
xmlData.append(xmlDict)
|
|
return xmlData
|
|
|
|
|
|
def checkData(xmlData):
|
|
print()
|
|
missingData = False
|
|
for mérő in xmlData:
|
|
for adat in mérő:
|
|
if len(mérő[adat]) == 0:
|
|
missingData = True
|
|
print(f'Adat hiányzik: "{adat}" a következő sorból:')
|
|
print(mérő)
|
|
return missingData
|
|
|
|
# ---------------------------------------------------------------------------- #
|
|
# Code #
|
|
# ---------------------------------------------------------------------------- #
|
|
|
|
|
|
requiredData = ['Notes', 'Meter n°', 'Reading', 'Reading data']
|
|
|
|
# --------------------------------- Read arg --------------------------------- #
|
|
if len(sys.argv) != 2:
|
|
raise ValueError('Útvonal hiányzik!')
|
|
csvPath = sys.argv[1]
|
|
print(f'Útvonal olvasása: {csvPath}')
|
|
folder = os.path.dirname(csvPath)
|
|
|
|
# --------------------------------- Read csv --------------------------------- #
|
|
|
|
csvArrs = []
|
|
csvFileNames = []
|
|
if os.path.isdir(csvPath):
|
|
folder = csvPath
|
|
print(f'Fájlok a mappában: {os.listdir(csvPath)}')
|
|
files = os.listdir(csvPath)
|
|
for file in os.listdir(csvPath):
|
|
if not os.path.splitext(file)[1] == '.csv':
|
|
files.remove(file)
|
|
print(f'Csv fájlok a mappában: {files}')
|
|
for i, file in enumerate(files):
|
|
csvFileNames.append(file)
|
|
with open(os.path.join(csvPath, file), newline='') as csvfile:
|
|
incsv = csv.reader(csvfile, delimiter=';')
|
|
csvArrs.append([])
|
|
for row in incsv:
|
|
csvArrs[i].append(row)
|
|
|
|
else:
|
|
csvFileNames.append(csvPath)
|
|
with open(csvPath, newline='') as csvfile:
|
|
incsv = csv.reader(csvfile, delimiter=';')
|
|
csvArrs.append([])
|
|
for row in incsv:
|
|
csvArrs[0].append(row)
|
|
|
|
|
|
# ---------------------------- Find column numbers --------------------------- #
|
|
|
|
# Get the number of each column:
|
|
dataColNumbers = []
|
|
for i, table in enumerate(csvArrs):
|
|
print(f'Oszlopok keresése: {csvFileNames[i]}:')
|
|
dataColNumbers.append(findColumnNumbers(table[0], requiredData))
|
|
|
|
# ----------------------------- Get required data ---------------------------- #
|
|
|
|
# Remove headers from csv:
|
|
for table in csvArrs:
|
|
table.pop(0)
|
|
|
|
# Get data based on column numbers:
|
|
reqData = []
|
|
for i, table in enumerate(csvArrs):
|
|
reqData.append(getReqData(table, dataColNumbers[i]))
|
|
|
|
# ---------------- Convert to required formats and check data ---------------- #
|
|
|
|
# get only required for xml, fill static values
|
|
xmlReqData = []
|
|
for i, table in enumerate(reqData):
|
|
print(f'Fájl ellenőrzése: {csvFileNames[i]}')
|
|
currXmlData = getXmlData(table)
|
|
if not checkData(currXmlData):
|
|
print('Fájl hibátlan')
|
|
print()
|
|
xmlReqData.append(currXmlData)
|
|
|
|
# print(xmlReqData)
|
|
# for i in xmlReqData:
|
|
# print(i)
|
|
|
|
|
|
# ------------------------------- Merge arrays ------------------------------- #
|
|
|
|
allXmlData = []
|
|
for array in xmlReqData:
|
|
for mérő in array:
|
|
allXmlData.append(mérő)
|
|
|
|
for i in allXmlData:
|
|
print(i)
|
|
|
|
# ----------------------------- Generate filename ---------------------------- #
|
|
|
|
|
|
now = datetime.datetime.now()
|
|
fileNameBase = 'SZLA_' + now.strftime('%Y%m%d%H%M%S') + '_'
|
|
fileNameNotFound = True
|
|
fileNum = 1
|
|
while fileNameNotFound:
|
|
fileName = os.path.join(folder, fileNameBase +
|
|
str(fileNum).zfill(2) + '.XML')
|
|
fileNum = fileNum+1
|
|
if not os.path.exists(fileName):
|
|
fileNameNotFound = False
|
|
|
|
|
|
print(f'Fájl mentése ide: {fileName}')
|
|
print()
|
|
|
|
# --------------------------------- Write xml -------------------------------- #
|
|
|
|
data = ET.Element('Leolvasasok')
|
|
data.tail = '\r\n'
|
|
for mérő in allXmlData:
|
|
currSub = ET.SubElement(data, 'Leolvasas')
|
|
currSub.tail = '\r\n'
|
|
for adat in mérő:
|
|
a = ET.SubElement(currSub, adat)
|
|
a.text = mérő[adat]
|
|
a.tail = '\r\n'
|
|
|
|
print(ET.dump(data))
|
|
print()
|
|
|
|
tree = ET.ElementTree(data)
|
|
tree.write(fileName, encoding="utf-8", xml_declaration=True)
|