Merge pull request #938 from BIBOLV/master

Update kem import script
pull/943/head
Fredrik Öhrström 2023-04-11 09:21:49 +02:00 zatwierdzone przez GitHub
commit f1c64230db
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
2 zmienionych plików z 51 dodań i 25 usunięć

Wyświetl plik

@ -56,4 +56,4 @@ then
else
echo "Error when extracting. To debug run: bash -x kem-extract $*"
echo "Collect the output and create an issue at https://github.com/wmbusmeters/wmbusmeters"
fi
fi

Wyświetl plik

@ -27,9 +27,12 @@ import re
import errno
import argparse
import base64
import Crypto.Cipher.AES as AES
from xml.dom import minidom
import zipfile
from xml.dom import minidom
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
help_prog = "Decrypts Kamstrup KEM file and imports meter information into wmbusmeters' config folder."
help_epilog = ""
@ -74,7 +77,7 @@ if (zipfile.is_zipfile(args.kem_file)):
with zipfile.ZipFile(args.kem_file,'r') as zipobj:
file_list = zipobj.namelist()
for file_name in file_list:
if file_name.endswith('.kem'):
if file_name.endswith('.kem') or file_name.endswith('.kem2'):
kem_file_content = zipobj.read(file_name)
break
#end if
@ -109,12 +112,20 @@ key = bytes(str(args.password).encode("utf-8"))
if (len(key) < 16): key += (16-len(key)) * b"\0"
# content decryption
aes = AES.new(key, AES.MODE_CBC, IV=key)
decryptedtext = aes.decrypt(encrypeddata)
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CBC(key), backend=backend)
decryptor = cipher.decryptor()
decryptedtext = decryptor.update(encrypeddata) + decryptor.finalize()
try:
decodedtext = decryptedtext.decode('utf-8')
except UnicodeDecodeError:
print("ERROR: Looks like password is wrong - decryption failed!")
sys.exit(1)
# save decrypted XML file if requested
if (args.output):
f = open(args.output, 'w')
f = open(args.output, 'wb')
f.write(decryptedtext)
f.close()
#end if
@ -134,22 +145,7 @@ if (not args.dryrun) and (not os.path.exists(args.config)):
raise
#end if
# parse the decrypted KEM file content and create corresponding meter files in
# wmbusmeters' config folder for each meter found in the XML file
trimmedText = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', decryptedtext.decode('utf-8'))
xmldoc = minidom.parseString(trimmedText)
for e in xmldoc.getElementsByTagName('Meter'):
# read information from source XML file
meterName = e.getElementsByTagName('MeterName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionType')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('MeterNo')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNo')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('VendorId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNo')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNo')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('DEK')[0].firstChild.nodeValue
def print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey):
# meter model identification
# CONTRIBUTING NOTE: additional meter types supported by wmbusmeters can be put here
# if their identification in KEM file is known
@ -167,7 +163,7 @@ for e in xmldoc.getElementsByTagName('Meter'):
wmbusmeters_driver = 'flowiq2200'
else:
wmbusmeters_driver = None
# print info to console
print('Found meter', meterName, '('+meterModel+')')
print(' number :', meterNum)
@ -200,4 +196,34 @@ for e in xmldoc.getElementsByTagName('Meter'):
#end if
#end for
# parse the decrypted KEM file content and create corresponding meter files in
# wmbusmeters' config folder for each meter found in the XML file
trimmedText = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', decryptedtext.decode('utf-8'))
xmldoc = minidom.parseString(trimmedText)
if xmldoc.documentElement.tagName == "MetersInOrder":
for e in xmldoc.getElementsByTagName('Meter'):
# read information from source XML file
meterName = e.getElementsByTagName('MeterName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionType')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('MeterNo')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNo')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('VendorId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNo')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNo')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('DEK')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
elif xmldoc.documentElement.tagName == "Devices":
for e in xmldoc.getElementsByTagName('Device'):
# read information from source XML file
meterName = e.getElementsByTagName('ShortName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionTypeName')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('CustomerDeviceNumber')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNumber')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('ManufacturerId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNumber')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNumber')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('Value')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
else:
print("ERROR: Unable to extract details from file")
sys.exit(1)