Add mutex for traffic/network ops.

pull/11/head
Christopher Young 2015-08-20 13:06:40 -04:00
rodzic 9b857c253a
commit 3ec5713831
2 zmienionych plików z 132 dodań i 51 usunięć

Wyświetl plik

@ -6,12 +6,14 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
) )
var messageQueue chan []byte var messageQueue chan []byte
var outSockets map[string]*net.UDPConn var outSockets map[string]*net.UDPConn
var dhcpLeases map[string]string var dhcpLeases map[string]string
var netMutex *sync.Mutex
// Read the "dhcpd.leases" file and parse out IP/hostname. // Read the "dhcpd.leases" file and parse out IP/hostname.
func getDHCPLeases() (map[string]string, error) { func getDHCPLeases() (map[string]string, error) {
@ -38,6 +40,8 @@ func getDHCPLeases() (map[string]string, error) {
} }
func sendToAllConnectedClients(msg []byte) { func sendToAllConnectedClients(msg []byte) {
netMutex.Lock()
defer netMutex.Unlock()
for _, sock := range outSockets { for _, sock := range outSockets {
sock.Write(msg) sock.Write(msg)
} }
@ -50,6 +54,8 @@ func getNetworkStats() int {
// See who has a DHCP lease and make a UDP connection to each of them. // See who has a DHCP lease and make a UDP connection to each of them.
func refreshConnectedClients() { func refreshConnectedClients() {
netMutex.Lock()
defer netMutex.Unlock()
validConnections := make(map[string]bool) validConnections := make(map[string]bool)
t, err := getDHCPLeases() t, err := getDHCPLeases()
if err != nil { if err != nil {
@ -111,6 +117,7 @@ func sendMsg(msg []byte) {
func initNetwork() { func initNetwork() {
messageQueue = make(chan []byte, 1024) // Buffered channel, 1024 messages. messageQueue = make(chan []byte, 1024) // Buffered channel, 1024 messages.
outSockets = make(map[string]*net.UDPConn) outSockets = make(map[string]*net.UDPConn)
netMutex = &sync.Mutex{}
refreshConnectedClients() refreshConnectedClients()
go messageQueueSender() go messageQueueSender()
} }

Wyświetl plik

@ -3,12 +3,12 @@ package main
import ( import (
"encoding/hex" "encoding/hex"
"math" "math"
"sync"
"time" "time"
) )
//-0b2b48fe3aef1f88621a0856110a31c01105c4e6c4e6c40a9a820300000000000000;rs=7; //-0b2b48fe3aef1f88621a0856110a31c01105c4e6c4e6c40a9a820300000000000000;rs=7;
/* /*
HDR: HDR:
@ -67,8 +67,11 @@ type TrafficInfo struct {
} }
var traffic map[uint32]TrafficInfo var traffic map[uint32]TrafficInfo
var trafficMutex *sync.Mutex
func cleanupOldEntries() { func cleanupOldEntries() {
trafficMutex.Lock()
defer trafficMutex.Unlock()
for icao_addr, ti := range traffic { for icao_addr, ti := range traffic {
if time.Since(ti.last_seen).Seconds() > float64(60) { //FIXME: 60 seconds with no update on this address - stop displaying. if time.Since(ti.last_seen).Seconds() > float64(60) { //FIXME: 60 seconds with no update on this address - stop displaying.
delete(traffic, icao_addr) delete(traffic, icao_addr)
@ -77,6 +80,8 @@ func cleanupOldEntries() {
} }
func sendTrafficUpdates() { func sendTrafficUpdates() {
trafficMutex.Lock()
defer trafficMutex.Unlock()
cleanupOldEntries() cleanupOldEntries()
for _, ti := range traffic { for _, ti := range traffic {
makeTrafficReport(ti) makeTrafficReport(ti)
@ -85,6 +90,7 @@ func sendTrafficUpdates() {
func initTraffic() { func initTraffic() {
traffic = make(map[uint32]TrafficInfo) traffic = make(map[uint32]TrafficInfo)
trafficMutex = &sync.Mutex{}
} }
func makeTrafficReport(ti TrafficInfo) { func makeTrafficReport(ti TrafficInfo) {
@ -113,7 +119,6 @@ func makeTrafficReport(ti TrafficInfo) {
msg[9] = tmp[1] // Longitude. msg[9] = tmp[1] // Longitude.
msg[10] = tmp[2] // Longitude. msg[10] = tmp[2] // Longitude.
//Altitude: OK //Altitude: OK
//TODO: 0xFFF "invalid altitude." //TODO: 0xFFF "invalid altitude."
alt := uint16(ti.alt) alt := uint16(ti.alt)
@ -146,7 +151,6 @@ func makeTrafficReport(ti TrafficInfo) {
sendMsg(prepareMessage(msg)) sendMsg(prepareMessage(msg))
} }
func parseDownlinkReport(s string) { func parseDownlinkReport(s string) {
var ti TrafficInfo var ti TrafficInfo
s = s[1:] s = s[1:]
@ -282,5 +286,75 @@ func parseDownlinkReport(s string) {
ti.last_seen = time.Now() ti.last_seen = time.Now()
trafficMutex.Lock()
traffic[ti.icao_addr] = ti traffic[ti.icao_addr] = ti
trafficMutex.Unlock()
} }
//TODO
/*
// dump1090Addr = "127.0.0.1:30003"
inConn, err := net.Dial("tcp", dump1090Addr)
if err != nil {
time.Sleep(1 * time.Second)
continue
}
rdr := bufio.NewReader(inConn)
for {
buf, err := rdr.ReadString('\n')
if err != nil { // Must have disconnected?
break
}
buf = strings.Trim(buf, "\r\n")
//log.Printf("%s\n", buf)
x := strings.Split(buf, ",")
//TODO: Add more sophisticated stuff that combines heading/speed updates with the location.
if len(x) < 22 {
continue
}
icao := x[4]
icaoDec, err := strconv.ParseInt(icao, 16, 32)
if err != nil {
continue
}
mutex.Lock()
// Retrieve previous information on this ICAO code.
var pi PositionInfo
if _, ok := blips[icaoDec]; ok {
pi = blips[icaoDec]
}
if x[1] == "3" {
//MSG,3,111,11111,AC2BB7,111111,2015/07/28,03:59:12.363,2015/07/28,03:59:12.353,,5550,,,42.35847,-83.42212,,,,,,0
alt := x[11]
lat := x[14]
lng := x[15]
//log.Printf("icao=%s, icaoDec=%d, alt=%s, lat=%s, lng=%s\n", icao, icaoDec, alt, lat, lng)
pi.alt = alt
pi.lat = lat
pi.lng = lng
}
if x[1] == "4" {
// MSG,4,111,11111,A3B557,111111,2015/07/28,06:13:36.417,2015/07/28,06:13:36.398,,,414,278,,,-64,,,,,0
vel := x[12]
hdg := x[13]
vr := x[16]
//log.Printf("icao=%s, icaoDec=%d, vel=%s, hdg=%s, vr=%s\n", icao, icaoDec, vel, hdg, vr)
pi.vel = vel
pi.hdg = hdg
pi.vr = vr
}
if x[1] == "1" {
// MSG,1,,,%02X%02X%02X,,,,,,%s,,,,,,,,0,0,0,0
tail := x[10]
pi.tail = tail
}
// Update "last seen" (any type of message).
pi.last_seen = time.Now()
blips[icaoDec] = pi // Update information on this ICAO code.
mutex.Unlock()
*/