sr0wx/airly_sq9atk.py

113 wiersze
3.8 KiB
Python
Executable File

#!/usr/bin/python -tt
# -*- coding: utf-8 -*-
import urllib2
import logging
from datetime import datetime
import json as JSON
import socket
from sr0wx_module import SR0WXModule
class AirlySq9atk(SR0WXModule):
"""Klasa pobierająca dane o zanieszczyszczeniach"""
def __init__(self, language, api_key, lat, lon, service_url, mode, maxDistanceKM, installationId):
self.__language = language
self.__api_key = api_key
self.__lat = str(lat)
self.__lon = str(lon)
self.__service_url = service_url
self.__mode = mode
self.__maxDistanceKM = str(maxDistanceKM)
self.__installationId = str(installationId)
self.__logger = logging.getLogger(__name__)
self.__levels = {
'VERY_LOW': 'bardzo_dobry',
'LOW': 'dobry',
'MEDIUM': 'umiarkowany',
'HIGH': 'zly',
'VERY_HIGH':'bardzo_zly',
}
def get_data(self):
self.__logger.info("::: Pobieram dane o zanieczyszczeniach...")
api_service_url = self.prepareApiServiceUrl()
self.__logger.info( api_service_url )
jsonData = JSON.loads(self.getAirlyData(api_service_url))
self.__logger.info("::: Przetwarzam dane...\n")
message = "".join([
" _ ",
" informacja_o_skaz_eniu_powietrza ",
" _ ",
" godzina ",
self.getHour(),
" _ stan_ogolny ",
self.__levels[jsonData['current']['indexes'][0]['level']],
self.getPollutionLevel(jsonData['current']['values']),
" _ ",
])
return {
"message": message,
"source": "airly",
}
def getPollutionLevel(self, json):
message = '';
for item in json:
if item['name'] == 'PM1':
message += ' _ pyl__zawieszony_pm1 '
message += self.__language.read_micrograms( int(item['value']) ) + ' '
if item['name'] == 'PM25':
message += ' _ pyl__zawieszony_pm25 '
message += self.__language.read_micrograms( int(item['value']) ) + ' '
if item['name'] == 'PM10':
message += ' _ pyl__zawieszony_pm10 '
message += self.__language.read_micrograms( int(item['value']) ) + ' '
return message;
def prepareApiServiceUrl(self):
api_url = 'https://airapi.airly.eu/v2/measurements/'
urls = {
'installationId': api_url + 'installation?installationId=' + self.__installationId,
'point': api_url + 'point?lat=' + self.__lat + '&lng=' + self.__lon,
'nearest': api_url + 'nearest?lat=' + self.__lat + '&lng=' + self.__lon + '&maxDistanceKM=' + self.__maxDistanceKM,
};
return urls[self.__mode]
def getAirlyData(self, url):
request = urllib2.Request(url, headers={'Accept': 'application/json', 'apikey': self.__api_key})
try:
webFile = urllib2.urlopen(request, None, 30)
return webFile.read()
except urllib2.URLError, e:
print e
except socket.timeout:
print "Timed out!"
return ""
def getHour(self):
time = ":".join([str(datetime.now().hour), str(datetime.now().minute)])
datetime_object = datetime.strptime(time, '%H:%M')
msg = self.__language.read_datetime(datetime_object, '%H %M')
return msg
def getVisibility(self, value):
msg = ' _ ';
msg += ' widocznosc ' + self.__language.read_distance( int(value/1000) )
return msg