kopia lustrzana https://github.com/gshau/wxserver
Added/rm files for influx database
rodzic
eff260ec63
commit
cb299beaf9
198
app/stationDB.py
198
app/stationDB.py
|
@ -1,198 +0,0 @@
|
||||||
import sqlite3
|
|
||||||
import pandas as pd
|
|
||||||
import stationDB as st
|
|
||||||
from jinja2 import Template
|
|
||||||
|
|
||||||
from bokeh.plotting import *
|
|
||||||
from bokeh.resources import INLINE
|
|
||||||
# from bokeh.util.browser import view
|
|
||||||
|
|
||||||
from bokeh.models import HoverTool
|
|
||||||
from bokeh.embed import components
|
|
||||||
import time
|
|
||||||
import datetime
|
|
||||||
import os.path
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class DB:
|
|
||||||
def __init__(self,fileName, table):
|
|
||||||
self.fileName=fileName
|
|
||||||
self.table=table
|
|
||||||
self.columnNames=[]
|
|
||||||
if not os.path.isfile(self.fileName):
|
|
||||||
self.createTable()
|
|
||||||
self.getColumnNames()
|
|
||||||
|
|
||||||
def createTable(self):
|
|
||||||
con=sqlite3.connect(self.fileName)
|
|
||||||
c=con.cursor()
|
|
||||||
c.execute("CREATE TABLE %s (timestamp real, timeString text)" % self.table)
|
|
||||||
|
|
||||||
con.close()
|
|
||||||
|
|
||||||
def addColumn(self, name):
|
|
||||||
con=sqlite3.connect(self.fileName)
|
|
||||||
c=con.cursor()
|
|
||||||
type="real"
|
|
||||||
c.execute("ALTER TABLE %s ADD COLUMN %s %s" % (self.table, name, type))
|
|
||||||
con.close()
|
|
||||||
|
|
||||||
def getColumnNames(self):
|
|
||||||
con=sqlite3.connect(self.fileName)
|
|
||||||
command="SELECT * from %s" % self.table
|
|
||||||
self.df=pd.read_sql_query(command, con)
|
|
||||||
self.columnNames=self.df.columns
|
|
||||||
con.close()
|
|
||||||
|
|
||||||
def addData(self, packet):
|
|
||||||
con=sqlite3.connect(self.fileName)
|
|
||||||
c=con.cursor()
|
|
||||||
|
|
||||||
# create column if not present
|
|
||||||
for key in packet.keys():
|
|
||||||
if key not in self.columnNames:
|
|
||||||
self.addColumn(key)
|
|
||||||
self.getColumnNames()
|
|
||||||
|
|
||||||
|
|
||||||
columns=', '.join(packet.keys())
|
|
||||||
placeholders = ', '.join('?' * len(packet))
|
|
||||||
sql = 'INSERT INTO master ({}) VALUES ({})'.format(columns, placeholders)
|
|
||||||
c.execute(sql, packet.values())
|
|
||||||
|
|
||||||
# # command="create table if not exists %s (date real, dateString text, name text, value real)" % name
|
|
||||||
# c.execute(command)
|
|
||||||
# timeString=datetime.datetime.fromtimestamp(time).strftime('%Y/%m/%d %H:%M:%S')
|
|
||||||
# dataTuple=(time, timeString, name, data,)
|
|
||||||
# command="INSERT INTO %s VALUES (?, ?, ?, ?)" % name
|
|
||||||
# c.execute(command, dataTuple)
|
|
||||||
con.commit()
|
|
||||||
con.close()
|
|
||||||
|
|
||||||
def loadDB(self,name):
|
|
||||||
con=sqlite3.connect(self.fileName)
|
|
||||||
command="SELECT * from %s" % name
|
|
||||||
self.df=pd.read_sql_query(command, con)
|
|
||||||
con.close()
|
|
||||||
|
|
||||||
class DBview:
|
|
||||||
def __init__(self,db,df,UTCOffset):
|
|
||||||
# self.timeRange=timeRange
|
|
||||||
self.db = db
|
|
||||||
self.df = df
|
|
||||||
self.UTCOffset = UTCOffset
|
|
||||||
|
|
||||||
|
|
||||||
def qp(self,attrList, name, timeRange):
|
|
||||||
now=datetime.datetime.now()
|
|
||||||
epoch=datetime.datetime.utcfromtimestamp(0)
|
|
||||||
tstart=now-datetime.timedelta(days=timeRange)
|
|
||||||
tcut=(tstart-epoch).total_seconds()
|
|
||||||
dataFrame=self.df[self.df.timestamp>tcut]
|
|
||||||
dataFrame['t']=dataFrame.timestamp*1000
|
|
||||||
# y1=getattr(self.df,attr1)
|
|
||||||
# y2=getattr(self.df,attr2)
|
|
||||||
output_file(name+'.html')
|
|
||||||
timeString=[datetime.datetime.fromtimestamp(dt).strftime('%Y/%m/%d %H:%M:%S') for dt in dataFrame.timestamp]
|
|
||||||
source=ColumnDataSource(data=dataFrame.to_dict('list'))
|
|
||||||
TOOLS="resize,hover,crosshair,pan,wheel_zoom,box_zoom,reset,tap,previewsave,box_select,poly_select,lasso_select"
|
|
||||||
p=figure(x_axis_type="datetime",tools=TOOLS)
|
|
||||||
for key in attrList:
|
|
||||||
print key
|
|
||||||
p.scatter('t',key, source=source)
|
|
||||||
p.select(dict(type=HoverTool)).tooltips=[
|
|
||||||
("Time", "@timeString"),
|
|
||||||
("Value", "@y1"),
|
|
||||||
("Value", "@y2"),
|
|
||||||
]
|
|
||||||
show(p)
|
|
||||||
|
|
||||||
def qph(self,attrList, name, timeRange):
|
|
||||||
now=datetime.datetime.now()
|
|
||||||
epoch=datetime.datetime.utcfromtimestamp(0)
|
|
||||||
tstart=now-datetime.timedelta(days=0,hours=timeRange)
|
|
||||||
tcut=(tstart-epoch).total_seconds()
|
|
||||||
dataFrame=self.df[self.df.timestamp>tcut]
|
|
||||||
dataFrame['t']=dataFrame.timestamp*1000
|
|
||||||
# y1=getattr(self.df,attr1)
|
|
||||||
# y2=getattr(self.df,attr2)
|
|
||||||
output_file(name+'.html')
|
|
||||||
timeString=[datetime.datetime.fromtimestamp(dt).strftime('%Y/%m/%d %H:%M:%S') for dt in dataFrame.timestamp]
|
|
||||||
source=ColumnDataSource(data=dataFrame.to_dict('list'))
|
|
||||||
TOOLS="resize,hover,crosshair,pan,wheel_zoom,box_zoom,reset,tap,previewsave,box_select,poly_select,lasso_select"
|
|
||||||
p=figure(x_axis_type="datetime",tools=TOOLS)
|
|
||||||
for key in attrList:
|
|
||||||
print key
|
|
||||||
p.scatter('t',key, source=source)
|
|
||||||
p.select(dict(type=HoverTool)).tooltips=[
|
|
||||||
("Time", "@timeString"),
|
|
||||||
("Value", "@y1"),
|
|
||||||
("Value", "@y2"),
|
|
||||||
]
|
|
||||||
show(p)
|
|
||||||
|
|
||||||
return source
|
|
||||||
|
|
||||||
|
|
||||||
def qphLive(self,attrList, name, timeRange):
|
|
||||||
# attrList=['stationLoadVolt']
|
|
||||||
now=datetime.datetime.now()
|
|
||||||
epoch=datetime.datetime.utcfromtimestamp(0)
|
|
||||||
tstart=now-datetime.timedelta(days=0,hours=timeRange-self.UTCOffset)
|
|
||||||
tcut=(tstart-epoch).total_seconds()
|
|
||||||
dataFrame=self.df[self.df.timestamp>tcut]
|
|
||||||
dataFrame['t']=dataFrame.timestamp*1000 - self.UTCOffset*3600*1000
|
|
||||||
output_server(name,url='http://10.0.1.2:5006')
|
|
||||||
colors=["red","blue","green","orange","purple","black","gray","magenta","cyan","brown","gold","darkkhaki","darksalmon"]
|
|
||||||
timeString=[datetime.datetime.fromtimestamp(dt).strftime('%Y/%m/%d %H:%M:%S') for dt in dataFrame.timestamp]
|
|
||||||
source=ColumnDataSource(data=dataFrame.to_dict('list'))
|
|
||||||
TOOLS="resize,hover,crosshair,pan,wheel_zoom,box_zoom,reset,tap,previewsave,box_select,poly_select,lasso_select"
|
|
||||||
keyList=attrList.keys()
|
|
||||||
p={}
|
|
||||||
ds={}
|
|
||||||
for mainKey in keyList:
|
|
||||||
if mainKey == keyList[0]:
|
|
||||||
p[mainKey]=figure(x_axis_type="datetime",tools=TOOLS, width=600, height=400, title=mainKey)
|
|
||||||
else:
|
|
||||||
p[mainKey]=figure(x_axis_type="datetime",tools=TOOLS, width=600, height=400, title=mainKey, x_range=p[keyList[0]].x_range)
|
|
||||||
ikey=0
|
|
||||||
hover={}
|
|
||||||
for key in attrList[mainKey]:
|
|
||||||
print key
|
|
||||||
# keySource=ColumnDataSource({'x': source.data['t'], 'y': series.values, 'series_name': name_for_display, 'Date': toy_df.index.format()})
|
|
||||||
p[mainKey].scatter('t',key, source=source, name=key,fill_color=colors[ikey],line_color=colors[ikey], legend=key)
|
|
||||||
|
|
||||||
hover = p[mainKey].select(dict(type=HoverTool))
|
|
||||||
# hover[ikey].renderers=[source.data[key]]
|
|
||||||
# hover[ikey].tooltips=tooltips+[("Series",key),("Time","@timeString"), ("Value", "@"+key)]
|
|
||||||
hover.tooltips=[("Series",key),("Time","@timeString"), ("Value", "@"+key)]
|
|
||||||
# hover.mode = "mouse"
|
|
||||||
ikey+=1
|
|
||||||
p[mainKey].legend.orientation="top_left"
|
|
||||||
renderer = p[mainKey].select(dict(name=key))
|
|
||||||
ds[mainKey]=renderer[0].data_source
|
|
||||||
|
|
||||||
|
|
||||||
# allP = vplot(*p.values())
|
|
||||||
# allP = gridplot([p.values()])
|
|
||||||
group=lambda flat, size: [flat[i:i+size] for i in range(0,len(flat), size)]
|
|
||||||
allP = gridplot(group(p.values(),1))
|
|
||||||
show(allP)
|
|
||||||
|
|
||||||
|
|
||||||
while True:
|
|
||||||
print 'updating...'
|
|
||||||
self.db=st.DB('data.sdb','master')
|
|
||||||
self.db.loadDB('master')
|
|
||||||
self.df = self.db.df
|
|
||||||
now=datetime.datetime.now()
|
|
||||||
tstart=now-datetime.timedelta(days=0,hours=timeRange-self.UTCOffset)
|
|
||||||
tcut=(tstart-epoch).total_seconds()
|
|
||||||
dataFrame=self.df[self.df.timestamp>tcut]
|
|
||||||
dataFrame['t']=dataFrame.timestamp*1000 - self.UTCOffset*3600*1000
|
|
||||||
for mainKey in keyList:
|
|
||||||
ds[mainKey].data = dataFrame.to_dict('list')
|
|
||||||
# print ds.data['stationIRTemp']
|
|
||||||
cursession().store_objects(ds[mainKey])
|
|
||||||
time.sleep(30)
|
|
|
@ -1,26 +0,0 @@
|
||||||
import stationDB as st
|
|
||||||
import numpy as np
|
|
||||||
import sqlite3
|
|
||||||
import pandas as pd
|
|
||||||
from bokeh.plotting import *
|
|
||||||
from bokeh.models import HoverTool
|
|
||||||
|
|
||||||
import time
|
|
||||||
import datetime
|
|
||||||
import pytz
|
|
||||||
|
|
||||||
db=st.DB('data.sdb','master')
|
|
||||||
db.loadDB('master')
|
|
||||||
UTCOffset = -5
|
|
||||||
dbv=st.DBview(db, db.df, UTCOffset)
|
|
||||||
|
|
||||||
attrList={}
|
|
||||||
attrList['Station Temperature']=['WeatherStationTemperature','WeatherStationHTUTemp','WeatherStationDewpoint']
|
|
||||||
attrList['Station IR']=['WeatherStationIRTemp','WeatherStationMLXTemp']
|
|
||||||
attrList['Brightness']=['WeatherStationBrightness']
|
|
||||||
attrList['Voltage']=['WeatherStationLoad']
|
|
||||||
attrList['Current']=['WeatherStationCurrent']
|
|
||||||
attrList['UV']=['WeatherStationUVIndex']
|
|
||||||
|
|
||||||
viewHistory=24
|
|
||||||
dbv.qphLive(attrList, 'DayView', viewHistory)
|
|
|
@ -11,19 +11,24 @@ import datetime
|
||||||
from crypt import *
|
from crypt import *
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from influxdb import InfluxDBClient
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Station:
|
class Station:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.name='Rpi'
|
self.name='Rpi'
|
||||||
self.udpPort=8123
|
self.udpPort=8123
|
||||||
self.BUF_SIZE=1024
|
self.BUF_SIZE=2048
|
||||||
self.packets={}
|
self.packets={}
|
||||||
self.packetMean={}
|
self.packetMean={}
|
||||||
self.nMeasurements={}
|
self.nMeasurements={}
|
||||||
self.lastUpdate={}
|
self.lastUpdate={}
|
||||||
self.avgFreq=30
|
# self.avgFreq=30
|
||||||
self.lastRelease=datetime.datetime.now()
|
self.lastRelease=datetime.datetime.now()
|
||||||
# self.secret_key = '1234567890123456'
|
influxIP = '127.0.0.1'
|
||||||
|
self.client = InfluxDBClient(influxIP, 8086, 'admin', 'admin', 'stationDB')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -64,52 +69,20 @@ class Station:
|
||||||
print "Message from ", addr, " :", packet
|
print "Message from ", addr, " :", packet
|
||||||
return packet
|
return packet
|
||||||
|
|
||||||
def checkForPacketRelease(self):
|
|
||||||
releasePacket=False
|
|
||||||
now=datetime.datetime.now()
|
|
||||||
releaseTime=0
|
|
||||||
if (now-self.lastRelease).seconds > self.avgFreq:
|
|
||||||
self.packetMean['timestamp']=time.time()
|
|
||||||
|
|
||||||
releasePacket=True
|
|
||||||
releaseTime=float(now.strftime("%s"))
|
|
||||||
self.lastRelease=now
|
|
||||||
timeString=now.strftime('%Y/%m/%d %H:%M:%S')
|
|
||||||
self.packetMean['timeString']=timeString
|
|
||||||
self.nMeasurements['timeString']=1
|
|
||||||
# reset packet list
|
|
||||||
for key in self.packets.keys():
|
|
||||||
self.packets[key]=[]
|
|
||||||
|
|
||||||
|
|
||||||
return (releasePacket,releaseTime)
|
|
||||||
|
|
||||||
|
|
||||||
def updateMeasurement(self, key, value):
|
|
||||||
|
|
||||||
if key not in self.lastUpdate.keys():
|
|
||||||
self.lastUpdate[key]=datetime.datetime.now()
|
|
||||||
|
|
||||||
self.lastUpdate[key]=datetime.datetime.now()
|
|
||||||
if key not in self.packets.keys():
|
|
||||||
self.packets[key]=value
|
|
||||||
self.packets[key]=np.append(self.packets[key],value)
|
|
||||||
self.packetMean[key] = np.mean(self.packets[key])
|
|
||||||
self.nMeasurements[key] = len(self.packets[key])
|
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
db=stationDB.DB('data.sdb','master')
|
#db=stationDB.DB('/root/sio/data.sdb','master')
|
||||||
s=Station()
|
s=Station()
|
||||||
s.udpPort=9990
|
s.udpPort=9990
|
||||||
s.startUDPListen('0.0.0.0',s.udpPort)
|
s.startUDPListen('0.0.0.0',s.udpPort)
|
||||||
|
|
||||||
sio=SocketIO('localhost', 5000)
|
sio=SocketIO('localhost', 5000)
|
||||||
verbose=False
|
verbose=True #False
|
||||||
readPackets=True
|
readPackets=True
|
||||||
while readPackets:
|
while readPackets:
|
||||||
rawPacket=s.recvPacket(verbose)
|
rawPacket=s.recvPacket(verbose)
|
||||||
print rawPacket
|
print 'raw Packet: ',rawPacket
|
||||||
|
print 'length packet: ', len(rawPacket)
|
||||||
try:
|
try:
|
||||||
sio.emit('dataPacket', rawPacket)
|
sio.emit('dataPacket', rawPacket)
|
||||||
print 'sent packet'
|
print 'sent packet'
|
||||||
|
@ -119,15 +92,14 @@ while readPackets:
|
||||||
# rawPacket['name']
|
# rawPacket['name']
|
||||||
|
|
||||||
for dataName in rawPacket['data'].keys():
|
for dataName in rawPacket['data'].keys():
|
||||||
|
packet = rawPacket['name']
|
||||||
key = rawPacket['name'].replace(' ','') + dataName.replace(' ','')
|
key = rawPacket['name'].replace(' ','') + dataName.replace(' ','')
|
||||||
value = rawPacket['data'][dataName]['value']
|
value = rawPacket['data'][dataName]['value']
|
||||||
releasePacket=s.updateMeasurement(key,value)
|
|
||||||
releasePacket,releaseTime=s.checkForPacketRelease()
|
|
||||||
if releasePacket:
|
|
||||||
for (key,value) in s.packetMean.items():
|
|
||||||
print 'Updating db: ',key,' = ',value
|
print 'Updating db: ',key,' = ',value
|
||||||
db.addData(s.packetMean)
|
now = datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S")
|
||||||
s.packetMean={}
|
json_body = [{"measurement": dataName, "tags": {"host": packet},"time": now,"fields": {"value": value}}]
|
||||||
|
print json_body
|
||||||
|
s.client.write_points(json_body)
|
||||||
|
|
||||||
|
|
||||||
s.sock.close()
|
s.sock.close()
|
Ładowanie…
Reference in New Issue