From e15c389975b9a5b79371ccd9df092c577fcd8419 Mon Sep 17 00:00:00 2001 From: AvSquirrel Date: Mon, 15 Feb 2016 01:27:02 +0000 Subject: [PATCH 1/2] Traffic message network optimization. Bug fix. --- main/gen_gdl90.go | 59 +++++++++++++++++++++++++++++++------------- main/network.go | 37 +++++++++++++++++++++++++--- main/ry835ai.go | 2 +- main/traffic.go | 63 ++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 136 insertions(+), 25 deletions(-) diff --git a/main/gen_gdl90.go b/main/gen_gdl90.go index 44eea303..9a1c19a8 100644 --- a/main/gen_gdl90.go +++ b/main/gen_gdl90.go @@ -604,9 +604,25 @@ func heartBeatSender() { sendGDL90(makeHeartbeat(), false) sendGDL90(makeStratuxHeartbeat(), false) sendGDL90(makeStratuxStatus(), false) - // sendGDL90(makeTrafficReport()) makeOwnshipReport() makeOwnshipGeometricAltitudeReport() + /* --- debug code: traffic demo* --- / + /* Uncomment and compile to display huge number of artificial traffic targets + numTargets := uint32(1000) + hexCode := uint32(0xFF0000) + + for i := uint32(0); i < numTargets; i++ { + tail := fmt.Sprintf("DEMO%d", i) + alt := float32((i*117%2000)*25 + 2000) + hdg := float64((i * 37) % 360) + spd := float64(100 + ((i*7)%61)*13) + + updateDemoTraffic(i|hexCode, tail, alt, spd, hdg) + + } + + */ + /* ---end traffic demo code ---*/ sendTrafficUpdates() updateStatus() case <-timerMessageStats.C: @@ -1000,22 +1016,30 @@ type settings struct { } type status struct { - Version string - Devices uint32 - Connected_Users uint - UAT_messages_last_minute uint - uat_products_last_minute map[string]uint32 - UAT_messages_max uint - ES_messages_last_minute uint - ES_messages_max uint - GPS_satellites_locked uint16 - GPS_satellites_seen uint16 - GPS_satellites_tracked uint16 - GPS_connected bool - GPS_solution string - RY835AI_connected bool - Uptime int64 - CPUTemp float32 + Version string + Devices uint32 + Connected_Users uint + UAT_messages_last_minute uint + uat_products_last_minute map[string]uint32 + UAT_messages_max uint + ES_messages_last_minute uint + ES_messages_max uint + GPS_satellites_locked uint16 + GPS_satellites_seen uint16 + GPS_satellites_tracked uint16 + GPS_connected bool + GPS_solution string + RY835AI_connected bool + Uptime int64 + CPUTemp float32 + NetworkDataMessagesSent uint64 + NetworkDataMessagesSentNonqueueable uint64 + NetworkDataBytesSent uint64 + NetworkDataBytesSentNonqueueable uint64 + NetworkDataMessagesSentLastSec uint64 + NetworkDataMessagesSentNonqueueableLastSec uint64 + NetworkDataBytesSentLastSec uint64 + NetworkDataBytesSentNonqueueableLastSec uint64 } var globalSettings settings @@ -1135,6 +1159,7 @@ func printStats() { log.Printf("stats [started: %s]\n", humanize.RelTime(time.Time{}, stratuxClock.Time, "ago", "from now")) log.Printf(" - CPUTemp=%.02f deg C, MemStats.Alloc=%s, MemStats.Sys=%s, totalNetworkMessagesSent=%s\n", globalStatus.CPUTemp, humanize.Bytes(uint64(memstats.Alloc)), humanize.Bytes(uint64(memstats.Sys)), humanize.Comma(int64(totalNetworkMessagesSent))) log.Printf(" - UAT/min %s/%s [maxSS=%.02f%%], ES/min %s/%s\n, Total traffic targets tracked=%s", humanize.Comma(int64(globalStatus.UAT_messages_last_minute)), humanize.Comma(int64(globalStatus.UAT_messages_max)), float64(maxSignalStrength)/10.0, humanize.Comma(int64(globalStatus.ES_messages_last_minute)), humanize.Comma(int64(globalStatus.ES_messages_max)), humanize.Comma(int64(len(seenTraffic)))) + log.Printf(" - Network data messages sent: %d total, %d nonqueueable. Network data bytes sent: %d total, %d nonqueueable.\n", globalStatus.NetworkDataMessagesSent, globalStatus.NetworkDataMessagesSentNonqueueable, globalStatus.NetworkDataBytesSent, globalStatus.NetworkDataBytesSentNonqueueable) if globalSettings.GPS_Enabled { log.Printf(" - Last GPS fix: %s, GPS solution type: %d using %d satellites (%d/%d seen/tracked), NACp: %d, est accuracy %.02f m\n", stratuxClock.HumanizeTime(mySituation.LastFixLocalTime), mySituation.quality, mySituation.Satellites, mySituation.SatellitesSeen, mySituation.SatellitesTracked, mySituation.NACp, mySituation.Accuracy) log.Printf(" - GPS vertical velocity: %.02f ft/sec; GPS vertical accuracy: %v m\n", mySituation.GPSVertVel, mySituation.AccuracyVert) diff --git a/main/network.go b/main/network.go index d72ae658..b86d8346 100644 --- a/main/network.go +++ b/main/network.go @@ -124,14 +124,20 @@ func sendToAllConnectedClients(msg networkMessage) { } // Send non-queueable messages immediately, or discard if the client is in sleep mode. - if sleepFlag { - continue + if !sleepFlag { + netconn.numOverflows = 0 // Reset the overflow counter whenever the client is not sleeping so that we're not penalizing future sleepmodes. } - netconn.numOverflows = 0 // Reset the overflow counter whenever the client is not sleeping so that we're not penalizing future sleepmodes. if !msg.queueable { + if sleepFlag { + continue + } netconn.Conn.Write(msg.msg) // Write immediately. totalNetworkMessagesSent++ + globalStatus.NetworkDataMessagesSent++ + globalStatus.NetworkDataMessagesSentNonqueueable++ + globalStatus.NetworkDataBytesSent += uint64(len(msg.msg)) + globalStatus.NetworkDataBytesSentNonqueueable += uint64(len(msg.msg)) } else { // Queue the message if the message is "queueable". if len(netconn.messageQueue) >= maxUserMsgQueueSize { // Too many messages queued? Drop the oldest. @@ -220,6 +226,8 @@ func messageQueueSender() { tmpConn := netconn tmpConn.Conn.Write(tmpConn.messageQueue[0]) totalNetworkMessagesSent++ + globalStatus.NetworkDataMessagesSent++ + globalStatus.NetworkDataBytesSent += uint64(len(tmpConn.messageQueue[0])) tmpConn.messageQueue = tmpConn.messageQueue[1:] outSockets[k] = tmpConn } @@ -355,6 +363,28 @@ func sleepMonitor() { } } +func networkStatsCounter() { + timer := time.NewTicker(1 * time.Second) + var previousNetworkMessagesSent, previousNetworkBytesSent, previousNetworkMessagesSentNonqueueable, previousNetworkBytesSentNonqueueable uint64 + + for { + <-timer.C + globalStatus.NetworkDataMessagesSentLastSec = globalStatus.NetworkDataMessagesSent - previousNetworkMessagesSent + globalStatus.NetworkDataBytesSentLastSec = globalStatus.NetworkDataBytesSent - previousNetworkBytesSent + globalStatus.NetworkDataMessagesSentNonqueueableLastSec = globalStatus.NetworkDataMessagesSentNonqueueable - previousNetworkMessagesSentNonqueueable + globalStatus.NetworkDataBytesSentNonqueueableLastSec = globalStatus.NetworkDataBytesSentNonqueueable - previousNetworkBytesSentNonqueueable + + // debug option. Uncomment to log per-second network statistics. Useful for debugging WiFi instability. + //log.Printf("Network data messages sent: %d total, %d last second. Network data bytes sent: %d total, %d last second.\n", globalStatus.NetworkDataMessagesSent, globalStatus.NetworkDataMessagesSentLastSec, globalStatus.NetworkDataBytesSent, globalStatus.NetworkDataBytesSentLastSec) + + previousNetworkMessagesSent = globalStatus.NetworkDataMessagesSent + previousNetworkBytesSent = globalStatus.NetworkDataBytesSent + previousNetworkMessagesSentNonqueueable = globalStatus.NetworkDataMessagesSentNonqueueable + previousNetworkBytesSentNonqueueable = globalStatus.NetworkDataBytesSentNonqueueable + } + +} + func initNetwork() { messageQueue = make(chan networkMessage, 1024) // Buffered channel, 1024 messages. outSockets = make(map[string]networkConnection) @@ -364,4 +394,5 @@ func initNetwork() { go monitorDHCPLeases() go messageQueueSender() go sleepMonitor() + go networkStatsCounter() } diff --git a/main/ry835ai.go b/main/ry835ai.go index 580cdd41..377af6cf 100644 --- a/main/ry835ai.go +++ b/main/ry835ai.go @@ -1114,7 +1114,7 @@ func attitudeReaderSender() { // Send, if valid. // if isGPSGroundTrackValid(), etc. - makeFFAHRSSimReport() + // makeFFAHRSSimReport() // simultaneous use of GDL90 and FFSIM not supported in FF 7.5.1 or later. Function definition will be kept for AHRS debugging and future workarounds. makeAHRSGDL90Report() mySituation.mu_Attitude.Unlock() diff --git a/main/traffic.go b/main/traffic.go index 476d37ca..5a679541 100644 --- a/main/traffic.go +++ b/main/traffic.go @@ -104,11 +104,16 @@ func sendTrafficUpdates() { trafficMutex.Lock() defer trafficMutex.Unlock() cleanupOldEntries() - for _, ti := range traffic { + var msg []byte + for _, ti := range traffic { // TO-DO: Limit number of aircraft in traffic message. ForeFlight chokes at ~1000-2000 messages depending on iDevice RAM. Practical limit likely around 500-1000 aircraft if ti.Position_valid { - makeTrafficReport(ti) + msg = append(msg, makeTrafficReportMsg(ti)...) } } + + if len(msg) > 0 { + sendGDL90(msg, false) + } } // Send update to attached client. @@ -121,7 +126,7 @@ func registerTrafficUpdate(ti TrafficInfo) { trafficUpdate.Send(tiJSON) } -func makeTrafficReport(ti TrafficInfo) { +func makeTrafficReportMsg(ti TrafficInfo) []byte { msg := make([]byte, 28) // See p.16. msg[0] = 0x14 // Message type "Traffic Report". @@ -200,7 +205,7 @@ func makeTrafficReport(ti TrafficInfo) { msg[19+i] = c } - sendGDL90(prepareMessage(msg), false) + return prepareMessage(msg) } func parseDownlinkReport(s string) { @@ -567,6 +572,56 @@ func esListen() { } } +/* +updateDemoTraffic creates / updates a simulated traffic target for demonstration / debugging +purpose. Target will circle clockwise around the current GPS position (if valid) or around +KOSH, once every five minutes. + +Inputs are ICAO 24-bit hex code, tail number (8 chars max), relative altitude in feet, +groundspeed in knots, and bearing offset from 0 deg initial position. +*/ +func updateDemoTraffic(icao uint32, tail string, relAlt float32, gs float64, offset float64) { + var ti TrafficInfo + + hdg := float64((stratuxClock.Milliseconds/1000)%360) + offset + // gs := float64(220) // knots + radius := gs * 0.1 / (2 * math.Pi) + x := radius * math.Cos(hdg*math.Pi/180.0) + y := radius * math.Sin(hdg*math.Pi/180.0) + // default traffic location is Oshkosh if GPS not detected + lat := 43.99 + lng := -88.56 + if isGPSValid() { + lat = float64(mySituation.Lat) + lng = float64(mySituation.Lng) + } + traffRelLat := y / 60 + traffRelLng := -x / (60 * math.Cos(lat*math.Pi/180.0)) + + ti.Icao_addr = icao + ti.OnGround = false + ti.addr_type = 0 + ti.emitter_category = 1 + ti.Lat = float32(lat + traffRelLat) + ti.Lng = float32(lng + traffRelLng) + ti.Position_valid = true + ti.Alt = int32(mySituation.Alt + relAlt) + ti.Track = uint16(hdg) + ti.Speed = uint16(gs) + ti.Speed_valid = true + ti.Vvel = 0 + ti.Tail = tail // "DEMO1234" + ti.Last_seen = stratuxClock.Time + ti.Last_source = 1 + + // now insert this into the traffic map... + trafficMutex.Lock() + defer trafficMutex.Unlock() + traffic[ti.Icao_addr] = ti + registerTrafficUpdate(ti) + seenTraffic[ti.Icao_addr] = true +} + func initTraffic() { traffic = make(map[uint32]TrafficInfo) seenTraffic = make(map[uint32]bool) From f2afc27d8fa28a642ce1290310b1d8cd206123fb Mon Sep 17 00:00:00 2001 From: AvSquirrel Date: Mon, 15 Feb 2016 05:04:38 +0000 Subject: [PATCH 2/2] Improved message queue efficiency and status messaging --- main/network.go | 59 +++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/main/network.go b/main/network.go index b86d8346..1ac0a3d3 100644 --- a/main/network.go +++ b/main/network.go @@ -150,14 +150,22 @@ func sendToAllConnectedClients(msg networkMessage) { netconn.messageQueue = netconn.messageQueue[s:] } } - netconn.messageQueue = append(netconn.messageQueue, msg.msg) + netconn.messageQueue = append(netconn.messageQueue, msg.msg) // each netconn.messageQueue is therefore an array (well, a slice) of formatted GDL90 messages outSockets[k] = netconn } } } -// Just returns the number of DHCP leases for now. +// Returns the number of DHCP leases and prints queue lengths func getNetworkStats() uint { + for _, netconn := range outSockets { + queueBytes := 0 + for _, msg := range netconn.messageQueue { + queueBytes += len(msg) + } + log.Printf("On %s:%d, Queue length = %d messages / %d bytes\n", netconn.Ip, netconn.Port, len(netconn.messageQueue), queueBytes) + } + ret := uint(len(dhcpLeases)) globalStatus.Connected_Users = ret return ret @@ -223,13 +231,49 @@ func messageQueueSender() { if len(netconn.messageQueue) > 0 && !isSleeping(k) && !isThrottled(k) { averageSendableQueueSize += float64(len(netconn.messageQueue)) // Add num sendable messages. - tmpConn := netconn - tmpConn.Conn.Write(tmpConn.messageQueue[0]) + var queuedMsg []byte + + // Combine the first 256 entries in netconn.messageQueue to avoid flooding wlan0 with too many IOPS. + // Need to play nice with non-queued messages, so this limits the number of entries to combine. + // UAT uplink block is 432 bytes, so transmit block size shouldn't be larger than 108 KiB. 10 Mbps per device would therefore be needed to send within a 100 ms window. + + mqDepth := len(netconn.messageQueue) + if mqDepth > 256 { + mqDepth = 256 + } + + for j := 0; j < mqDepth; j++ { + queuedMsg = append(queuedMsg, netconn.messageQueue[j]...) + } + + /* + for j, _ := range netconn.messageQueue { + queuedMsg = append(queuedMsg, netconn.messageQueue[j]...) + } + */ + + netconn.Conn.Write(queuedMsg) totalNetworkMessagesSent++ globalStatus.NetworkDataMessagesSent++ - globalStatus.NetworkDataBytesSent += uint64(len(tmpConn.messageQueue[0])) - tmpConn.messageQueue = tmpConn.messageQueue[1:] - outSockets[k] = tmpConn + globalStatus.NetworkDataBytesSent += uint64(len(queuedMsg)) + + //netconn.messageQueue = [][]byte{} + if mqDepth < len(netconn.messageQueue) { + netconn.messageQueue = netconn.messageQueue[mqDepth:] + } else { + netconn.messageQueue = [][]byte{} + } + outSockets[k] = netconn + + /* + tmpConn := netconn + tmpConn.Conn.Write(tmpConn.messageQueue[0]) + totalNetworkMessagesSent++ + globalStatus.NetworkDataMessagesSent++ + globalStatus.NetworkDataBytesSent += uint64(len(tmpConn.messageQueue[0])) + tmpConn.messageQueue = tmpConn.messageQueue[1:] + outSockets[k] = tmpConn + */ } } @@ -381,6 +425,7 @@ func networkStatsCounter() { previousNetworkBytesSent = globalStatus.NetworkDataBytesSent previousNetworkMessagesSentNonqueueable = globalStatus.NetworkDataMessagesSentNonqueueable previousNetworkBytesSentNonqueueable = globalStatus.NetworkDataBytesSentNonqueueable + } }