wmbusmeters/utils/kem-import.py

229 wiersze
10 KiB
Python

# A Python script that decrypts the content of Kamstrup KEM file and imports meter files
# to wmbusmeters' config folder.
#
# The KEM file is a (sometimes zipped) xml file that contains xml-encrypted data using
# the xml-enc standard (http://www.w3.org/2001/04/xmlenc). The password needed to decrypt
# the xml-encrypted data can either be the CustomerId or something else selected by the
# person that has created the KEM file using Kamstrup software.
#
# This script takes the encrypted KEM file and decrypts its content (it automaticly detects
# the zip archive and extracts the kem file from it). The result is a XML with a list of meters
# with their types, serial numbers, keys, etc. The script prints the information about each meter
# to the console and populates wmbusmeters' config folder with corresponding meter files.
# Optionally, decrypted KEM file content can be saved to a file (option -o).
#
# Usage: python kem-import.py [options] <kem_file> <password>
# kem_file ... the name of the KEM file to decrypt or a zip archive having the KEM file in it
# password ... original password used to encrypt the content (16 characters maximum)
# options ... use -h switch to get a list of options with their descriptions
#
# The script is Python2/Python3 compatible.
#
from __future__ import print_function
import os
import sys
import re
import errno
import argparse
import base64
import zipfile
from xml.dom import minidom
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
help_prog = "Decrypts Kamstrup KEM file and imports meter information into wmbusmeters' config folder."
help_epilog = ""
help_kem = "The name of the KEM file to decrypt or a name of a zip archive that contains the encrypted KEM file."
help_pwd = "The original password used to encrypt the KEM file content (16 characters maximum)."
help_cfg = """Location of config files for wmbusmeters (default location is the current working directory).
This option has the same meaning as --useconfig option of the wmbusmeters daemon, i.e. --useconfig=/ will
populate /etc/wmbusmeters.d folder (must be run with sudo) and --useconfig=. (the default) populates
./etc/wmbusmeters.d folder. If the destination folder does not exist, it will be created; existing meter
files will be overwritten."""
help_dry = "No meter files will be created, only the info will be printed on the console."
help_out = "Save the decrypted KEM file content into a given file."
# define command line arguments
argparser = argparse.ArgumentParser(description=help_prog, epilog=help_epilog)
argparser.add_argument('kem_file', help=help_kem)
argparser.add_argument('password', help=help_pwd)
argparser.add_argument("-c", "--useconfig", type=str, action='store', dest='config', default=os.getcwd(), help=help_cfg)
argparser.add_argument("-n", "--dryrun", action='store_true', dest='dryrun', help=help_dry)
argparser.add_argument("-o", "--output", type=str, action='store', dest='output', help=help_out)
# parse command line arguments
args = argparser.parse_args()
# adjust the config folder location to full target path
args.config = args.config.strip(os.path.sep) + '/etc/wmbusmeters.d/'
# test if input file exists
if (not os.path.isfile(args.kem_file)):
print('ERROR: The input KEM file does not exist.')
sys.exit(1)
#end if
kem_file_content = None
# test if input file is a zip file
# yes: find a file with .kem extension in the zip and extract content of that file
# no : load the content of the input file directly
if (zipfile.is_zipfile(args.kem_file)):
print("Detected a zip file on input ... extracting")
with zipfile.ZipFile(args.kem_file,'r') as zipobj:
file_list = zipobj.namelist()
for file_name in file_list:
if file_name.endswith('.kem') or file_name.endswith('.kem2'):
kem_file_content = zipobj.read(file_name)
break
#end if
#end for
if (not kem_file_content):
print("ERROR: The zip file '%s' does not seem to contain any '.kem' file." % (args.kem_file))
sys.exit(1)
#end if
else:
# read content of the kem file
with open(args.kem_file,'r') as f:
kem_file_content = f.read()
#end if
# read and parse KEM file content and extract its encrypted part
# (KEM file is a a XML file formated according to http://www.w3.org/2001/04/xmlenc)
try:
xmldoc = minidom.parseString(kem_file_content)
encrypedtext = xmldoc.getElementsByTagName('CipherValue')[0].firstChild.nodeValue
encrypeddata = base64.b64decode(encrypedtext)
except:
print('ERROR: The KEM file does not seem to contain encrypted data.')
sys.exit(1)
# KEM file data is encrypted with AES-128-CBC cipher and decryption
# requires a 128bit key and 128bit initialization vector. In the case of KEM file,
# the initialization vector is the same as the key. If password length is less
# than 16 characters (128 bits), we pad the key with zeros up to its full length.
key = bytes(str(args.password).encode("utf-8"))
if (len(key) < 16): key += (16-len(key)) * b"\0"
# content decryption
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.CBC(key), backend=backend)
decryptor = cipher.decryptor()
decryptedtext = decryptor.update(encrypeddata) + decryptor.finalize()
try:
decodedtext = decryptedtext.decode('utf-8')
except UnicodeDecodeError:
print("ERROR: Looks like password is wrong - decryption failed!")
sys.exit(1)
# save decrypted XML file if requested
if (args.output):
f = open(args.output, 'wb')
f.write(decryptedtext)
f.close()
#end if
# create the folder for meter files if it does not exists
if (not args.dryrun) and (not os.path.exists(args.config)):
print("Creating target folder:", args.config)
try:
os.makedirs(args.config)
except OSError as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
#end if
def print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey):
# meter model identification
# CONTRIBUTING NOTE: additional meter types supported by wmbusmeters can be put here
# if their identification in KEM file is known
if (meterName == 'MC302') and (meterModel.startswith('302T')):
wmbusmeters_driver = 'multical302'
elif (meterName == 'MC303') and (meterModel.startswith('303')):
wmbusmeters_driver = 'multical303'
elif (meterName == 'MC403') and (meterModel.startswith('403')):
wmbusmeters_driver = 'multical403'
elif (meterName == 'MC21') and (meterModel.startswith('021')):
wmbusmeters_driver = 'multical21'
elif (meterName == 'MC603') and (meterModel.startswith('603')):
wmbusmeters_driver = 'multical603'
elif (meterName == 'KWM2210'):
wmbusmeters_driver = 'flowiq2200'
else:
wmbusmeters_driver = None
# print info to console
print('Found meter', meterName, '('+meterModel+')')
print(' number :', meterNum)
print(' serial :', meterSerial)
print(' type :', meterType)
print(' driver :', wmbusmeters_driver)
print(' config :', meterConfig)
print(' key :', meterKey)
if (not args.dryrun):
# save meter file in config folder if meter is supported by wmbusmeters
if (wmbusmeters_driver is not None):
try:
f = open(args.config+meterSerial, 'w')
f.write("name=%s\n" % (meterNum))
f.write("driver=%s\n" % (wmbusmeters_driver))
f.write("id=%s\n" % (meterSerial))
f.write("key=%s\n" % (meterKey))
f.close()
print(' meter file:', args.config+meterSerial)
except (IOError, OSError) as e:
if (e.errno in [errno.EPERM, errno.EACCES]):
print(e)
print("You may need to use 'sudo' to access the target config folder.")
sys.exit(1)
else:
raise
else:
print(' !! unknow meter type, meter file has not been created')
#end if
#end for
# parse the decrypted KEM file content and create corresponding meter files in
# wmbusmeters' config folder for each meter found in the XML file
trimmedText = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\xff]', '', decryptedtext.decode('utf-8'))
xmldoc = minidom.parseString(trimmedText)
if xmldoc.documentElement.tagName == "MetersInOrder":
for e in xmldoc.getElementsByTagName('Meter'):
# read information from source XML file
meterName = e.getElementsByTagName('MeterName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionType')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('MeterNo')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNo')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('VendorId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNo')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNo')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('DEK')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
elif xmldoc.documentElement.tagName == "Devices":
for e in xmldoc.getElementsByTagName('Device'):
# read information from source XML file
meterName = e.getElementsByTagName('ShortName')[0].firstChild.nodeValue
meterType = e.getElementsByTagName('ConsumptionTypeName')[0].firstChild.nodeValue
meterNum = e.getElementsByTagName('CustomerDeviceNumber')[0].firstChild.nodeValue
meterSerial = e.getElementsByTagName('SerialNumber')[0].firstChild.nodeValue
meterVendor = e.getElementsByTagName('ManufacturerId')[0].firstChild.nodeValue
meterConfig = e.getElementsByTagName('ConfigNumber')[0].firstChild.nodeValue
meterModel = e.getElementsByTagName('TypeNumber')[0].firstChild.nodeValue
meterKey = e.getElementsByTagName('Value')[0].firstChild.nodeValue
print_meter(meterName,meterType,meterNum,meterSerial,meterVendor,meterConfig,meterModel,meterKey)
else:
print("ERROR: Unable to extract details from file")
sys.exit(1)