506 wiersze
19 KiB
Python
Executable File
506 wiersze
19 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Path to predictor binary
|
|
pred_binary = './pred_src/pred'
|
|
|
|
# Modules from the Python standard library.
|
|
import datetime
|
|
import time as timelib
|
|
import math
|
|
import sys
|
|
import os
|
|
import logging
|
|
import calendar
|
|
import optparse
|
|
import subprocess
|
|
import simplejson as json
|
|
|
|
# We use Pydap from http://pydap.org/.
|
|
import pydap.exceptions, pydap.client, pydap.lib
|
|
|
|
# Output logger format
|
|
log = logging.getLogger('main')
|
|
console = logging.StreamHandler()
|
|
console.setFormatter(logging.Formatter('%(levelname)s: %(message)s'))
|
|
log.addHandler(console)
|
|
|
|
progress_f = ''
|
|
progress = {
|
|
'gfs_percent': 0,
|
|
'gfs_timeremaining': '',
|
|
'gfs_complete': False,
|
|
'pred_running': False,
|
|
'pred_complete': False,
|
|
'progress_error': '',
|
|
}
|
|
|
|
def update_progress(**kwargs):
|
|
global progress_f
|
|
global progress
|
|
for arg in kwargs:
|
|
progress[arg] = kwargs[arg]
|
|
try:
|
|
progress_f.truncate(0)
|
|
progress_f.seek(0)
|
|
progress_f.write(json.dumps(progress))
|
|
progress_f.flush()
|
|
os.fsync(progress_f.fileno())
|
|
except IOError:
|
|
global log
|
|
log.error('Could not update progress file')
|
|
|
|
def main():
|
|
"""
|
|
The main program routine.
|
|
"""
|
|
|
|
# Set up our command line options
|
|
parser = optparse.OptionParser()
|
|
parser.add_option('-t', '--timestamp', dest='timestamp',
|
|
help='search for dataset covering the POSIX timestamp TIME \t[default: now]',
|
|
metavar='TIME', type='int',
|
|
default=calendar.timegm(datetime.datetime.utcnow().timetuple()))
|
|
parser.add_option('-o', '--output', dest='output',
|
|
help='file to write GFS data to with \'%(VAR)\' replaced with the the value of VAR [default: %default]',
|
|
metavar='FILE',
|
|
default='gfs/gfs_%(time)_%(lat)_%(lon)_%(latdelta)_%(londelta).dat')
|
|
parser.add_option('-v', '--verbose', action='count', dest='verbose',
|
|
help='be verbose. The more times this is specified the more verbose.', default=False)
|
|
parser.add_option('-p', '--past', dest='past',
|
|
help='window of time to save data is at most HOURS hours in past [default: %default]',
|
|
metavar='HOURS',
|
|
type='int', default=3)
|
|
parser.add_option('-f', '--future', dest='future',
|
|
help='window of time to save data is at most HOURS hours in future [default: %default]',
|
|
metavar='HOURS',
|
|
type='int', default=9)
|
|
parser.add_option('--hd', dest='hd', action="store_true",
|
|
help='use higher definition GFS data (default: no)')
|
|
parser.add_option('--preds', dest='preds_path',
|
|
help='path that contains uuid folders for predictions [default: %default]',
|
|
default='./predict/preds/', metavar='PATH')
|
|
|
|
group = optparse.OptionGroup(parser, "Location specifiers",
|
|
"Use these options to specify a particular tile of data to download.")
|
|
group.add_option('--lat', dest='lat',
|
|
help='tile centre latitude in range (-90,90) degrees north [default: %default]',
|
|
metavar='DEGREES',
|
|
type='float', default=52)
|
|
group.add_option('--lon', dest='lon',
|
|
help='tile centre longitude in degrees east [default: %default]',
|
|
metavar='DEGREES',
|
|
type='float', default=0)
|
|
group.add_option('--latdelta', dest='latdelta',
|
|
help='tile radius in latitude in degrees [default: %default]',
|
|
metavar='DEGREES',
|
|
type='float', default=5)
|
|
group.add_option('--londelta', dest='londelta',
|
|
help='tile radius in longitude in degrees [default: %default]',
|
|
metavar='DEGREES',
|
|
type='float', default=5)
|
|
parser.add_option_group(group)
|
|
|
|
#group = optparse.OptionGroup(parser, "Tile specifiers",
|
|
#"Use these options to specify how many tiles to download.")
|
|
#group.add_option('--lattiles', dest='lattiles',
|
|
#metavar='TILES',
|
|
#help='number of tiles along latitude to download [default: %default]',
|
|
#type='int', default=1)
|
|
#group.add_option('--lontiles', dest='lontiles',
|
|
#metavar='TILES',
|
|
#help='number of tiles along longitude to download [default: %default]',
|
|
#type='int', default=1)
|
|
#parser.add_option_group(group)
|
|
|
|
(options, args) = parser.parse_args()
|
|
|
|
# Check we got a UUID in the arguments
|
|
if len(args) != 1:
|
|
log.error('Exactly one positional argument should be supplied (uuid).')
|
|
sys.exit(1)
|
|
|
|
uuid = args[0]
|
|
uuid_path = options.preds_path + "/" + uuid + "/"
|
|
|
|
# Check we're not already running with this UUID
|
|
for line in os.popen('ps xa'):
|
|
process = " ".join(line.split()[4:])
|
|
if process.find(uuid) > 0:
|
|
pid = int(line.split()[0])
|
|
if pid != os.getpid():
|
|
log.error('A process is already running for this UUID, quitting.')
|
|
sys.exit(1)
|
|
|
|
# Make the UUID directory if non existant
|
|
if not os.path.exists(uuid_path):
|
|
os.mkdir(uuid_path, 0770)
|
|
|
|
# Open the progress.json file for writing, creating it and closing again to flush
|
|
global progress_f
|
|
global progress
|
|
try:
|
|
progress_f = open(uuid_path+"progress.json", "w")
|
|
update_progress(gfs_percent=0, gfs_timeremaining="Please wait...")
|
|
except IOError:
|
|
log.error('Error opening progress.json file')
|
|
sys.exit(1)
|
|
|
|
# Check the predictor binary exists
|
|
if not os.path.exists(pred_binary):
|
|
log.error('Predictor binary does not exist.')
|
|
sys.exit(1)
|
|
|
|
# Check the latitude is in the right range.
|
|
if (options.lat < -90) | (options.lat > 90):
|
|
log.error('Latitude %s is outside of the range (-90,90).')
|
|
sys.exit(1)
|
|
|
|
# Check the delta sizes are valid.
|
|
if (options.latdelta <= 0.5) | (options.londelta <= 0.5):
|
|
log.error('Latitiude and longitude deltas must be at least 0.5 degrees.')
|
|
sys.exit(1)
|
|
|
|
if options.londelta > 180:
|
|
log.error('Longitude window sizes greater than 180 degrees are meaningless.')
|
|
sys.exit(1)
|
|
|
|
# We need to wrap the longitude into the right range.
|
|
options.lon = canonicalise_longitude(options.lon)
|
|
|
|
# How verbose are we being?
|
|
if options.verbose > 0:
|
|
log.setLevel(logging.INFO)
|
|
if options.verbose > 1:
|
|
log.setLevel(logging.DEBUG)
|
|
if options.verbose > 2:
|
|
logging.basicConfig(level=logging.INFO)
|
|
if options.verbose > 3:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
log.debug('Using cache directory: %s' % pydap.lib.CACHE)
|
|
|
|
timestamp_to_find = options.timestamp
|
|
time_to_find = datetime.datetime.utcfromtimestamp(timestamp_to_find)
|
|
|
|
log.info('Looking for latest dataset which covers %s' % time_to_find.ctime())
|
|
try:
|
|
dataset = dataset_for_time(time_to_find, options.hd)
|
|
except:
|
|
print('Could not locate a dataset for the requested time.')
|
|
sys.exit(1)
|
|
|
|
dataset_times = map(timestamp_to_datetime, dataset.time)
|
|
dataset_timestamps = map(datetime_to_posix, dataset_times)
|
|
|
|
log.info('Found appropriate dataset:')
|
|
log.info(' Start time: %s (POSIX %s)' % \
|
|
(dataset_times[0].ctime(), dataset_timestamps[0]))
|
|
log.info(' End time: %s (POSIX %s)' % \
|
|
(dataset_times[-1].ctime(), dataset_timestamps[-1]))
|
|
|
|
log.info(' Latitude: %s -> %s' % (min(dataset.lat), max(dataset.lat)))
|
|
log.info(' Longitude: %s -> %s' % (min(dataset.lon), max(dataset.lon)))
|
|
|
|
# for dlat in range(0,options.lattiles):
|
|
# for dlon in range(0,options.lontiles):
|
|
window = ( \
|
|
options.lat, options.latdelta, \
|
|
options.lon, options.londelta)
|
|
|
|
write_file(options.output, dataset, \
|
|
window, \
|
|
time_to_find - datetime.timedelta(hours=options.past), \
|
|
time_to_find + datetime.timedelta(hours=options.future))
|
|
|
|
purge_cache()
|
|
|
|
update_progress(gfs_percent=100, gfs_timeremaining='Done', gfs_complete=True, pred_running=True)
|
|
|
|
subprocess.call([pred_binary, '-i/var/www/hab/predict/gfs/', '-v', '-o'+uuid_path+'flight_path.csv', uuid_path+'scenario.ini'])
|
|
|
|
update_progress(pred_running=False, pred_complete=True)
|
|
|
|
def purge_cache():
|
|
"""
|
|
Purge the pydap cache (if set).
|
|
"""
|
|
|
|
if pydap.lib.CACHE is None:
|
|
return
|
|
|
|
log.info('Purging PyDAP cache.')
|
|
|
|
for file in os.listdir(pydap.lib.CACHE):
|
|
log.debug(' Deleting %s.' % file)
|
|
os.remove(pydap.lib.CACHE + file)
|
|
|
|
def write_file(output_format, data, window, mintime, maxtime):
|
|
log.info('Downloading data in window (lat, lon) = (%s +/- %s, %s +/- %s).' % window)
|
|
|
|
# Firstly, get the hgtprs variable to extract the times we're going to use.
|
|
hgtprs_global = data['hgtprs']
|
|
|
|
# Check the dimensions are what we expect.
|
|
assert(hgtprs_global.dimensions == ('time', 'lev', 'lat', 'lon'))
|
|
|
|
# Work out what times we want to download
|
|
times = filter(lambda x: (x >= mintime) & (x <= maxtime),
|
|
map(timestamp_to_datetime, hgtprs_global.maps['time']))
|
|
|
|
num_times = len(times)
|
|
current_time = 0
|
|
|
|
start_time = min(times)
|
|
end_time = max(times)
|
|
log.info('Downloading from %s to %s.' % (start_time.ctime(), end_time.ctime()))
|
|
|
|
# Filter the longitudes we're actually going to use.
|
|
longitudes = filter(lambda x: longitude_distance(x[1], window[2]) <= window[3] ,
|
|
enumerate(hgtprs_global.maps['lon']))
|
|
|
|
# Filter the latitudes we're actually going to use.
|
|
latitudes = filter(lambda x: math.fabs(x[1] - window[0]) <= window[1] ,
|
|
enumerate(hgtprs_global.maps['lat']))
|
|
|
|
update_progress(gfs_percent=10, gfs_timeremaining="Please wait...")
|
|
|
|
starttime = datetime.datetime.now()
|
|
|
|
# Write one file for each time index.
|
|
for timeidx, time in enumerate(hgtprs_global.maps['time']):
|
|
|
|
timestamp = datetime_to_posix(timestamp_to_datetime(time))
|
|
if (timestamp < datetime_to_posix(start_time)) | (timestamp > datetime_to_posix(end_time)):
|
|
continue
|
|
|
|
current_time += 1
|
|
|
|
log.info('Downloading data for %s.' % (timestamp_to_datetime(time).ctime()))
|
|
|
|
downloaded_data = { }
|
|
current_var = 0
|
|
time_per_var = datetime.timedelta()
|
|
for var in ('hgtprs', 'ugrdprs', 'vgrdprs'):
|
|
current_var += 1
|
|
grid = data[var]
|
|
log.info('Processing variable \'%s\' with shape %s...' % (var, grid.shape))
|
|
|
|
# Check the dimensions are what we expect.
|
|
assert(grid.dimensions == ('time', 'lev', 'lat', 'lon'))
|
|
|
|
# See if the longitude ragion wraps...
|
|
if (longitudes[0][0] == 0) & (longitudes[-1][0] == hgtprs_global.maps['lat'].shape[0]-1):
|
|
# Download the data. Unfortunately, OpeNDAP only supports remote access of
|
|
# contiguous regions. Since the longitude data wraps, we may require two
|
|
# windows. The 'right' way to fix this is to download a 'left' and 'right' window
|
|
# and munge them together. In terms of download speed, however, the overhead of
|
|
# downloading is so great that it is quicker to download all the longitude
|
|
# data in our slice and do the munging afterwards.
|
|
selection = grid[\
|
|
timeidx,
|
|
:, \
|
|
latitudes[0][0]:(latitudes[-1][0]+1),
|
|
: ]
|
|
else:
|
|
selection = grid[\
|
|
timeidx,
|
|
:, \
|
|
latitudes[0][0]:(latitudes[-1][0]+1),
|
|
longitudes[0][0]:(longitudes[-1][0]+1) ]
|
|
|
|
# Cache the downloaded data for later
|
|
downloaded_data[var] = selection
|
|
|
|
log.info(' Downloaded data has shape %s...', selection.shape)
|
|
if len(selection.shape) != 3:
|
|
log.error(' Expected 3-d data.')
|
|
return
|
|
|
|
now = datetime.datetime.now()
|
|
time_elapsed = now - starttime
|
|
num_vars = (current_time - 1)*3 + current_var
|
|
time_per_var = time_elapsed / num_vars
|
|
total_time = num_times * 3 * time_per_var
|
|
time_left = total_time - time_elapsed
|
|
time_left = timelib.strftime('%M:%S', timelib.gmtime(time_left.seconds))
|
|
|
|
update_progress(gfs_percent=int(
|
|
10 +
|
|
(((current_time - 1) * 90) / num_times) +
|
|
((current_var * 90) / (3 * num_times))
|
|
), gfs_timeremaining=time_left)
|
|
|
|
# Check all the downloaded data has the same shape
|
|
target_shape = downloaded_data['hgtprs']
|
|
assert( all( map( lambda x: x == target_shape,
|
|
filter( lambda x: x.shape, downloaded_data.itervalues() ) ) ) )
|
|
|
|
log.info('Writing output...')
|
|
|
|
hgtprs = downloaded_data['hgtprs']
|
|
ugrdprs = downloaded_data['ugrdprs']
|
|
vgrdprs = downloaded_data['vgrdprs']
|
|
|
|
log.debug('Using longitudes: %s' % (map(lambda x: x[1], longitudes),))
|
|
|
|
output_filename = output_format
|
|
output_filename = output_filename.replace('%(time)', str(timestamp))
|
|
output_filename = output_filename.replace('%(lat)', str(window[0]))
|
|
output_filename = output_filename.replace('%(latdelta)', str(window[1]))
|
|
output_filename = output_filename.replace('%(lon)', str(window[2]))
|
|
output_filename = output_filename.replace('%(londelta)', str(window[3]))
|
|
|
|
log.info(' Writing \'%s\'...' % output_filename)
|
|
output = open(output_filename, 'w')
|
|
|
|
# Write the header.
|
|
output.write('# window centre latitude, window latitude radius, window centre longitude, window longitude radius, POSIX timestamp\n')
|
|
header = window + (timestamp,)
|
|
output.write(','.join(map(str,header)) + '\n')
|
|
|
|
# Write the axis count.
|
|
output.write('# num_axes\n')
|
|
output.write('3\n') # FIXME: HARDCODED!
|
|
|
|
# Write each axis, a record showing the size and then one with the values.
|
|
output.write('# axis 1: pressures\n')
|
|
output.write(str(hgtprs.maps['lev'].shape[0]) + '\n')
|
|
output.write(','.join(map(str,hgtprs.maps['lev'][:])) + '\n')
|
|
output.write('# axis 2: latitudes\n')
|
|
output.write(str(len(latitudes)) + '\n')
|
|
output.write(','.join(map(lambda x: str(x[1]), latitudes)) + '\n')
|
|
output.write('# axis 3: longitudes\n')
|
|
output.write(str(len(longitudes)) + '\n')
|
|
output.write(','.join(map(lambda x: str(x[1]), longitudes)) + '\n')
|
|
|
|
# Write the number of lines of data.
|
|
output.write('# number of lines of data\n')
|
|
output.write('%s\n' % (hgtprs.shape[0] * len(latitudes) * len(longitudes)))
|
|
|
|
# Write the number of components in each data line.
|
|
output.write('# data line component count\n')
|
|
output.write('3\n') # FIXME: HARDCODED!
|
|
|
|
# Write the data itself.
|
|
output.write('# now the data in axis 3 major order\n')
|
|
output.write('# data is: '
|
|
'geopotential height [gpm], u-component wind [m/s], '
|
|
'v-component wind [m/s]\n')
|
|
for pressureidx, pressure in enumerate(hgtprs.maps['lev']):
|
|
for latidx, latitude in enumerate(hgtprs.maps['lat']):
|
|
for lonidx, longitude in enumerate(hgtprs.maps['lon']):
|
|
if longitude_distance(longitude, window[2]) > window[3]:
|
|
continue
|
|
record = ( hgtprs.array[pressureidx,latidx,lonidx], \
|
|
ugrdprs.array[pressureidx,latidx,lonidx], \
|
|
vgrdprs.array[pressureidx,latidx,lonidx] )
|
|
output.write(','.join(map(str,record)) + '\n')
|
|
|
|
def canonicalise_longitude(lon):
|
|
"""
|
|
The GFS model has all longitudes in the range 0.0 -> 359.5. Canonicalise
|
|
a longitude so that it fits in this range and return it.
|
|
"""
|
|
lon = math.fmod(lon, 360)
|
|
if lon < 0.0:
|
|
lon += 360.0
|
|
assert((lon >= 0.0) & (lon < 360.0))
|
|
return lon
|
|
|
|
def longitude_distance(lona, lonb):
|
|
"""
|
|
Return the shortest distance in degrees between longitudes lona and lonb.
|
|
"""
|
|
distances = ( \
|
|
math.fabs(lona - lonb), # Straightforward distance
|
|
360 - math.fabs(lona - lonb), # Other way 'round.
|
|
)
|
|
return min(distances)
|
|
|
|
def datetime_to_posix(time):
|
|
"""
|
|
Convert a datetime object to a POSIX timestamp.
|
|
"""
|
|
return calendar.timegm(time.timetuple())
|
|
|
|
def timestamp_to_datetime(timestamp):
|
|
"""
|
|
Convert a GFS fractional timestamp into a datetime object representing
|
|
that time.
|
|
"""
|
|
# The GFS timestmp is a floating point number fo days from the epoch,
|
|
# day '0' appears to be January 1st 1 AD.
|
|
|
|
(fractional_day, integer_day) = math.modf(timestamp)
|
|
|
|
# Unfortunately, the datetime module uses a different epoch.
|
|
ordinal_day = int(integer_day - 1)
|
|
|
|
# Convert the integer day to a time and add the fractional day.
|
|
return datetime.datetime.fromordinal(ordinal_day) + \
|
|
datetime.timedelta(days = fractional_day)
|
|
|
|
def possible_urls(time, hd):
|
|
"""
|
|
Given a datetime object representing a date and time, return a list of
|
|
possible data URLs which could cover that period.
|
|
|
|
The list is ordered from latest URL (i.e. most likely to be correct) to
|
|
earliest.
|
|
|
|
We assume that a particular data set covers a period of P days and
|
|
hence the earliest data set corresponds to time T - P and the latest
|
|
available corresponds to time T given target time T.
|
|
"""
|
|
|
|
period = datetime.timedelta(days = 7.5)
|
|
|
|
earliest = time - period
|
|
latest = time
|
|
|
|
if hd:
|
|
url_format = 'http://nomads.ncep.noaa.gov:9090/dods/gfs_hd/gfs_hd%i%02i%02i/gfs_hd_%02iz'
|
|
else:
|
|
url_format = 'http://nomads.ncep.noaa.gov:9090/dods/gfs/gfs%i%02i%02i/gfs_%02iz'
|
|
|
|
# Start from the latest, work to the earliest
|
|
proposed = latest
|
|
possible_urls = []
|
|
while proposed >= earliest:
|
|
for offset in ( 18, 12, 6, 0 ):
|
|
possible_urls.append(url_format % \
|
|
(proposed.year, proposed.month, proposed.day, offset))
|
|
proposed -= datetime.timedelta(days = 1)
|
|
|
|
return possible_urls
|
|
|
|
def dataset_for_time(time, hd):
|
|
"""
|
|
Given a datetime object, attempt to find the latest dataset which covers that
|
|
time and return pydap dataset object for it.
|
|
"""
|
|
|
|
url_list = possible_urls(time, hd)
|
|
|
|
for url in url_list:
|
|
try:
|
|
log.debug('Trying dataset at %s.' % url)
|
|
dataset = pydap.client.open_url(url)
|
|
|
|
start_time = timestamp_to_datetime(dataset.time[0])
|
|
end_time = timestamp_to_datetime(dataset.time[-1])
|
|
|
|
if start_time <= time and end_time >= time:
|
|
log.info('Found good dataset at %s.' % url)
|
|
return dataset
|
|
except pydap.exceptions.ServerError:
|
|
# Skip server error.
|
|
pass
|
|
|
|
raise RuntimeError('Could not find appropriate dataset.')
|
|
|
|
# If this is being run from the interpreter, run the main function.
|
|
if __name__ == '__main__':
|
|
main()
|
|
|