sr0wx/lib/taf.py

477 wiersze
21 KiB
Python
Executable File

#!/usr/bin/python -tt
# -*- coding: utf-8 -*-
# Copyright 2009-2011 Michal Sadowski (sq6jnx at hamradio dot pl)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import re, urllib
def between(a,b,c):
if (a<=b and b<=c) or (c<=b and b<=a):
return True
else:
return False
class taf:
tafData = []
weather = None
# Borrowed from pymetar.py, "slightly" modified. If element had not be translated type "%", ie.:
#
# "VC" : "%"
_WeatherConditions = {
"DZ" : {
"" : ["", "mzawka", ""],
"-" : ["slaba", "",""],
"+" : ["silna", "",""],
"VC" : ["", "", "w_poblizu"],
"MI" : ["niska", "",""],
"BC" : ["","mzawka",""],
"PR" : ["czesciowa", "",""],
"TS" : ["", "mzawka burza",""],
"BL" : ["", "", "z_zawieja"],
"SH" : ["przelotna", "",""],
"DR" : ["niska", "zamiec", ""],
"FZ" : ["marznaca","",""]
},
"RA" : {
"" : ["", "deszcz", ""],
"-" : ["lekki", "", ""],
"+" : ["silny", "", ""],
"VC" : ["", "", "w_poblizu"],
"MI" : ["niski", "", ""],
"BC" : ["", "deszcz", ""],
"PR" : ["czesciowy","", ""],
"TS" : ["","burza",""],
"BL" : ["","","z_zawieja"],
"SH" : ["przelotny", "", ""],
"DR" : ["niski","", ""],
"FZ" : ["marznacy","",""]
},
"SN" : {
"" : ["","snieg", ""],
"-" : ["lekki", "", ""],
"+" : ["silny", "", ""],
"VC" : ["", "", "w_poblizu"],
"MI" : ["niski", "",""],
"BC" : ["", "platy_sniegu", ""],
"PR" : ["","czesciowe_opady śniegu",""],
"TS" : ["", "burza_sniezna", ""],
"BL" : ["","zamiec_sniezna",""],
"SH" : ["", "przelotne opady sniegu", ""],
"DR" : ["","niska zamiec_sniezna",""],
"FZ" : ["","marznacy snieg",""]
},
"SG" : {
"" : ["","opady sniegu ziarnistego",""],
"-" : ["slabe", "",""],
"+" : ["silne", "",""],
"VC" : ["", "", "w_poblizu"],
"MI" : ["niskie", "",""],
"BC" : ["","opady platow sniegu_ziarnistego",""],
"PR" : ["","czesciowe opady sniegu_ziarnistego",""],
"TS" : ["","burza_sniezna",""],
"BL" : ["","","z_zawieją"],
"SH" : ["przelotne","",""],
"DR" : ["","niska zamiec_sniezna",""],
"FZ" : ["marznace","",""]
},
"IC" : {
"" : "%",
"-" : "%",
"+" : "%",
"BC" : "%", #"Patches of ice crystals",
"PR" : "%", #"Partial ice crystals",
"TS" : "%", #("Ice crystal storm", "storm"),
"BL" : "%", #"Blowing ice crystals",
"SH" : "%", #"Showers of ice crystals",
"DR" : "%", #"Drifting ice crystals",
"FZ" : "%" #"Freezing ice crystals",
},
"PE" : {
"" : "%", #"Moderate ice pellets",
"-" : "%",
"+" : "%",
"VC" : "%",
"MI" : "%",
"BC" : "%", #"Patches of ice pellets",
"PR" : "%", #"Partial ice pellets",
"TS" : "%", #("Ice pellets storm", "storm"),
"BL" : "%", #"Blowing ice pellets",
"SH" : "%", #"Showers of ice pellets",
"DR" : "%", #"Drifting ice pellets",
"FZ" : "%" #"Freezing ice pellets",
},
"GR" : {
"" : ["","grad",""],
"-" : ["slaby","",""],
"+" : ["silny","",""],
"VC" : ["","","w poblizu"],
"MI" : ["niski","",""],
"BC" : "%", #"Patches of hail",
"PR" : "%", #"Partial hail",
"TS" : ["","burza_gradowa",""],
"BL" : "%",
"SH" : ["","przelotne_opady gradu",""],
"DR" : "%", #"Drifting hail",
"FZ" : ["marznacy","grad",""]
},
"GS" : {
"" : "%",#"Moderate small hail",
"-" : "%", #["słaba", "",""],
"+" : "%", #["silna", "",""],
"VC" : "%", #["", "", "w pobliżu"],
"MI" : "%", #["niska", "",""],
"BC" : "%", #"Patches of small hail",
"PR" : "%", #"Partial small hail",
"TS" : "%", #("Small hailstorm", "storm"),
"BL" : "%", #"Blowing small hail",
"SH" : "%", #"Showers of small hail",
"DR" : "%", #"Drifting small hail",
"FZ" : "%", #"Freezing small hail",
},
"UP" : {
"" : "%", #"Moderate precipitation",
"-" : "%", #["słaba", "",""],
"+" : "%", #["silna", "",""],
"VC" : "%", #["", "", "w pobliżu"],
"MI" : "%", #["niska", "",""],
"BC" : "%", #"Patches of precipitation",
"PR" : "%", #"Partial precipitation",
"TS" : "%", #("Unknown thunderstorm", "storm"),
"BL" : "%", #"Blowing precipitation",
"SH" : "%", #"Showers, type unknown",
"DR" : "%", #"Drifting precipitation",
"FZ" : "%", #"Freezing precipitation",
},
"BR" : {
"" : ["","zamglenie",""],
"-" : ["lekkie","",""],
"+" : ["silne","",""],
"VC" : ["","","w_poblizu"],
"MI" : ["niskie","",""],
"BL" : ["","","z_wiatrem"],
"FZ" : ["marznace","",""],
},
"FG" : {
"" : ["","mgla",""],
"-" : ["lekka","",""],
"+" : ["silna","",""],
"VC" : ["","","w_poblizu"],
"MI" : ["niska","",""],
"BL" : ["","","z_wiatrem"],
"FZ" : ["marznaca","",""],
},
"SQ" : {
"" : ["","nawalnica",""],
"-" : ["slaba", "",""],
"+" : ["silna", "",""],
"VC" :["", "", "w_poblizu"],
}
}
knots2mps= 0.5144444444444445
kmh2mps = 0.2777777777777778
mi2km = 1.609344
rawTAF=None
ICAO = None
def __init__(self, ICAO=None, at=(0,0,0), taf=None, address=None):
# taf variable will be used for debug purposes.
if taf is not None:
report = taf.split("\n")
header=report[0].split()
originDate = header[1]
self.rawTAF = taf
if address== None:
#address = "http://140.90.128.70/pub/data/forecasts/taf/stations/%ICAO%.TXT"
#address = "http://204.227.127.33/metars/index.php?station_ids=%ICAO%&std_trans=standard&hoursStr=most+recent+only&chk_tafs=on&submitmet=Submit"
address = "http://aviationweather.gov/adds/tafs/index.php?station_ids=%ICAO%&std_trans=standard&hoursStr=most+recent+only&chk_tafs=on&submitmet=Submit"
if ICAO is not None:
self.ICAO=ICAO
url = address.replace("%ICAO%",str.upper(ICAO))
taf = self.getFile(url)
self.rawTAF = taf
report = taf.split("\n")
# ICAO = "EPWR" # <--------------------------------- delete me
dd,hh,mm=at
_period = re.compile('\d{4}/\d{4}')
_wind= re.compile('(VRB|\d{3})(\d{2,3})(?:(G)(\d{2,3}))?(KT|MPS|KMH)(?:(\d{3})(V)(\d{3}))?')
_vis = re.compile('(?:^|\s)\d\d\d\d(?:$|\s)|P?\dSM')
_weather = re.compile('^(?:\+|-|VC)?(?:MI|BC|DR|BL|SH|TS|FZ)?(?:(?:DZ|RA|SN|SG|IC|PE|GR|GS)|(?:BR|FG|FU|VA|DU|SA|HZ)|(?:PO|SQ|FC|SS|DS))$')
_skyCnd = re.compile("(SKC|FEW|SCT|BKN|OVC|NSC)(\d\d\d(VV\d\d\d)?)?(CB|TCU)?")
_temp = re.compile("""\s(M)?\d\d/(M)?\d\d\s| # METAR
T(M)?\d\d/?\d\dG # TAF""")
_press = re.compile("Q\d\d\d\d")
_cavok = re.compile("CAVOK")
_nsw = re.compile("NSW")
_valid = re.compile('^TAF\ |TEMPO|B(E)?CMG|PROB|GRADU|RAPID|%s'%ICAO)
for line in report:
if _valid.search(line) is not None:
TAFFound = True
tl = {"period":"", "wind":"", "vis":"", "weather":"",
"skyCnd":"", "temp":"", "press":"", "cavok":"", "nsw":""} #, "change":None}
for word in line.split(" "):
tl["period"] = (tl["period"] + " " + " ".join((self.flatten(_period.findall(word)) or "" ))).strip()
tl["wind"] = (tl["wind"] + " " + " ".join((self.flatten(_wind.findall(word)) or "" ))).strip()
tl["vis"] = (tl["vis"] + " " + " ".join((self.flatten(_vis.findall(word)) or "" ))).strip()
tl["weather"]= (tl["weather"]+ " " + " ".join((self.flatten(_weather.findall(word)) or "" ))).strip()
tl["temp"] = (tl["temp"] + " " + " ".join((self.flatten(_temp.findall(word)) or "" ))).strip()
tl["press"] = (tl["press"] + " " + " ".join((self.flatten(_press.findall(word)) or "" ))).strip()
tl["skyCnd"] = (tl["skyCnd"]+ " " + " ".join((self.flatten(_skyCnd.findall(word)) or "" ))).strip()
tl["cavok"] = (tl["cavok"] + " " + " ".join((self.flatten(_cavok.findall(word)) or "" ))).strip()
tl["nsw"] = (tl["nsw"] + " " + " ".join((self.flatten(_nsw.findall(word)) or "" ))).strip()
# tl["skyCnd"]= tl["skyCnd"]+ " ".join( (_skyCnd.findall(word)) )
self.tafData.append(tl)
TAF0 = self.tafData[0]
keys = ["period", "wind", "vis", "weather", "skyCnd", "temp", "press","cavok","nsw"]
for i in range(0,len(self.tafData)):
for elem in keys:
if self.tafData[i][elem].strip() == "":
self.tafData[i][elem] = TAF0[elem]
if self.tafData[i]["cavok"]== "CAVOK" and "".join(
(self.tafData[i]["vis"], self.tafData[i]["weather"], self.tafData[i]["skyCnd"]) ) == "":
self.tafData[i]["vis"] = "P6SM"
self.tafData[i]["weather"] = "NSW"
self.tafData[i]["skyCnd"]= "SKC"
elif self.tafData[i]["nsw"]=="NSW" and self.tafData[i]["weather"]:
self.tafData[i]["weather"]="NSW"
self.weather = self.prepare(dd,hh,mm)
def flatten(self,x): # http://kogs-www.informatik.uni-hamburg.de/~meine/python_tricks
"""flatten(sequence) -> list
Returns a single, flat list which contains all elements retrieved
from the sequence and all recursively contained sub-sequences
(iterables).
Examples:
>>> [1, 2, [3,4], (5,6)]
[1, 2, [3, 4], (5, 6)]
>>> flatten([[[1,2,3], (42,None)], [4,5], [6], 7, MyVector(8,9,10)])
[1, 2, 3, 42, None, 4, 5, 6, 7, 8, 9, 10]"""
result = []
for el in x:
#if isinstance(el, (list, tuple)):
if hasattr(el, "__iter__") and not isinstance(el, basestring):
result.extend(self.flatten(el))
else:
result.append(el)
return result
def getFile(self, url):
webFile = urllib.urlopen(url)
contents = webFile.read()
webFile.close()
return contents
def changeMoment(self,dd,hh,mm):
self.weather = self.prepare(dd,hh,mm)
def prepare(self,dd,hh,mm=0): # prepareForecast
# Finding proper forecast may be a difficult task. Consider following TAF:
#
# EPWR 212000Z 2121/2206 18005KT SCT040 (1)
# BECMG 2122/2224 29008KT (2)
# TEMPO 2121/2206 8000 SHRA BKN026CB (3)
# PROB30
# TEMPO 2121/2203 23012G30KT TSRA (4)
#
# +-----------------------+
# | (4) |
# +-----------------------------------+
# | +-------+ |
# | | (2) | (3) |
# +---+-------+-----------------------+
# | (1) |
# -+---+---+---+---+---+---+---+---+---+----------------------------------->
# 21 22 23 24 1 2 3 4 5 6 time (hr)
#
#
# Lets say we want to know the forecast for 21st of this month at 23 UTC.
# It' easy to see that we can use *every line* from this TAF message, but
# it seems that the 2nd one (2) should be the most appropriate one.
#
# I'll use the following algorithm to find proper "region":
# If we want to find a forecast for the moment t_0 we should:
# 1. chceck if forecastStart <= t_0 <= forecastEnd
# 2. If yes, "rating" number R = (t0 - forecastStart) + (forecastEnd - t0) =
# = forecastEnd-forecastStart, R --> 0.
# 3. If there is a forecast with the same R as before it wins
# (i.e. forecast (3) is better than "general" forecast (1) @ 4 UTC and
# forecast (4) is the best one @ 2 UTC.
#
# This algorithm does not use BECMG, PROB or TEMPO markers.
weather=None
R = 44640 # one month in minutes
t0= dd*24*60+hh*60+mm
for line in self.tafData:
if "/" in line["period"].strip():
l = line["period"].strip()
fromDD, fromHH, toDD, toHH = (int(l[0:2]), int(l[2:4]), int(l[5:7]), int(l[7:9]))
fs, fe = (fromDD*24*60+fromHH*60, toDD*24*60+toHH*60)
if between(fs, t0, fe) and R >= abs(fe-fs):
weather = line
R = abs(fe-fs)
return weather
def compact(self, list):
# Quite useful, from http://mail.python.org/pipermail/python-list/2007-May/613113.html
return " ".join(list).strip().replace(" "," ").split()
def getWindSpeed(self):
"""
Return the wind speed in meters per second.
"""
if self.weather == None:
return False
wind = self.weather["wind"].split()[1:]
if "KT" in wind:
u, c = ("KT", self.knots2mps)
elif "MPS" in wind:
u, c = ("MPS", 1)
elif "KMH" in wind:
u, c = ("KMH", self.kmh2mps)
else:
return None
if "G" in wind:
windSpeed = [ int(round(int(wind[wind.index('G')-1])*c)),
int(round(int(wind[wind.index('G')+1])*c)) ]
else:
windSpeed = [ int(round(int(wind[wind.index(u) -1])*c)) ]
return windSpeed
def getWindDirection(self):
if self.weather == None:
return False
direction = self.weather["wind"].split()
if 'V' in direction:
return [ int(direction[0]),
int(wind[wind.index('V')-1]),
int(wind[wind.index('V')+1]) ]
else:
if direction<>[] and direction[0]<>"VRB":
return ([int(direction[0])] or [None])
else:
return ["VRB"]
def getVisibility(self): # in kilometers, only simple examples
if self.weather == None:
return False
vis = self.weather["vis"].strip()
try:
if vis == 'P6SM' or vis=='9999':
return 10 # 10 km or more
elif 'SM' in vis :
return int(vis[-3])*self.mi2km
else:
return int(vis)/1000.0 # or None
except:
pass
return None
def getTemperature(self):
print "---> lib/taf.py: temperature not supported", self.weather["temp"]
return None
def getSkyConditions(self):
if self.weather["skyCnd"] is None:
return None
clouds = ['OVC','BKN','SCT','FEW','SKC','NSC']
for cloud in clouds:
if cloud in self.weather["skyCnd"]:
return cloud
return None
def getPressure(self):
print "---> lib/taf.py: pressure not supported", self.weather["press"]
return None
def getWeather(self):
if self.weather == None:
return False
wx = ["","",""]
rv = ""
for w in self.weather["weather"].strip().split():
wx = ["", "", ""]
for k in self._WeatherConditions.keys():
if k in w:
for elem in self._WeatherConditions[k]:
if elem in w:
if self._WeatherConditions[k][elem] == "%":
print " ".join( ("---> lib/taf.py: couldn't interpret",elem,"in",self.weather["weather"].strip().split()) )
else:
if self._WeatherConditions[k][elem][1]!="":
wx = self._WeatherConditions[k][elem]
elif self._WeatherConditions[k][elem][2]!="":
wx[2] = self._WeatherConditions[k][elem][2]
elif self._WeatherConditions[k][elem][0]!="" and self._WeatherConditions[k][elem][0] not in wx[0]:
wx[0] = " ".join( (wx[0], self._WeatherConditions[k][elem][0]) )
rv= " ".join( ( rv, " ".join( (wx) ) ) )
return rv
#a="""EPWR 282300Z 2900/2909 31004KT 6000 BKN015
# PROB40
# TEMPO 2900/2903 3000 SHRA SCT002 BKN010CB
# PROB30 2900/2907 1500 BR BKN001"""
#
#myTAF = taf(taf=a)
#
## nie zapomnij o linii 199
#
#print myTAF.rawTAF
#
#for hh in range(0,4):
# print myTAF.tafData
# myTAF.changeMoment(29,hh,00)
# print myTAF.weather
# print "godzina ", (hh,0)
# print "predkosc wiatru ", myTAF.getWindSpeed(), "m/s"
# print "kierunek wiatru ", myTAF.getWindDirection(), " st."
# print "widocznosc ", myTAF.getVisibility(), " km"
# print "pogoda ", myTAF.getWeather()
# print "---------------------------"