diff --git a/main/gen_gdl90.go b/main/gen_gdl90.go index f80b48ed..69a967e1 100644 --- a/main/gen_gdl90.go +++ b/main/gen_gdl90.go @@ -14,10 +14,12 @@ import ( // http://www.faa.gov/nextgen/programs/adsb/wsa/media/GDL90_Public_ICD_RevA.PDF const ( - stratuxVersion = "v0.2" - configLocation = "/etc/stratux.conf" - managementAddr = ":80" - maxDatagramSize = 8192 + stratuxVersion = "v0.2" + configLocation = "/etc/stratux.conf" + managementAddr = ":80" + maxDatagramSize = 8192 + maxUserMsgQueueSize = 2500 // About 1MB per port per connected client. + UPLINK_BLOCK_DATA_BITS = 576 UPLINK_BLOCK_BITS = (UPLINK_BLOCK_DATA_BITS + 160) UPLINK_BLOCK_DATA_BYTES = (UPLINK_BLOCK_DATA_BITS / 8) @@ -191,7 +193,7 @@ func makeOwnshipReport() bool { msg[18] = 0x01 // "Light (ICAO) < 15,500 lbs" - sendGDL90(prepareMessage(msg)) + sendGDL90(prepareMessage(msg), false) return true } @@ -212,7 +214,7 @@ func makeOwnshipGeometricAltitudeReport() bool { msg[3] = 0x00 msg[4] = 0x0A - sendGDL90(prepareMessage(msg)) + sendGDL90(prepareMessage(msg), false) return true } @@ -263,18 +265,18 @@ func relayMessage(msgtype uint16, msg []byte) { ret[i+4] = msg[i] } - sendGDL90(prepareMessage(ret)) + sendGDL90(prepareMessage(ret), true) } func heartBeatSender() { timer := time.NewTicker(1 * time.Second) for { <-timer.C - sendGDL90(makeHeartbeat()) + sendGDL90(makeHeartbeat(), false) // sendGDL90(makeTrafficReport()) makeOwnshipReport() makeOwnshipGeometricAltitudeReport() - sendGDL90(makeInitializationMessage()) + sendGDL90(makeInitializationMessage(), false) sendTrafficUpdates() updateStatus() } @@ -393,7 +395,8 @@ func defaultSettings() { globalSettings.UAT_Enabled = true //TODO globalSettings.ES_Enabled = false //TODO globalSettings.GPS_Enabled = false //TODO - globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90}, {nil, "", 49002, NETWORK_AHRS_FFSIM}} + //FIXME: Need to change format below. + globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, false, nil}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, false, nil}, {nil, "", 49002, NETWORK_AHRS_FFSIM, false, nil}} globalSettings.AHRS_Enabled = false } diff --git a/main/network.go b/main/network.go index 71268eea..34771c02 100644 --- a/main/network.go +++ b/main/network.go @@ -11,8 +11,10 @@ import ( ) type networkMessage struct { - msg []byte - msgType uint8 + msg []byte + msgType uint8 + queueable bool + ts time.Time } type networkConnection struct { @@ -20,6 +22,8 @@ type networkConnection struct { Ip string Port uint32 Capability uint8 + sleepMode bool // Device is not able to receive messages currently. + sleepQueue [][]byte // Device message queue. } var messageQueue chan networkMessage @@ -61,9 +65,19 @@ func getDHCPLeases() (map[string]string, error) { func sendToAllConnectedClients(msg networkMessage) { netMutex.Lock() defer netMutex.Unlock() - for _, netconn := range outSockets { + for k, netconn := range outSockets { if (netconn.Capability & msg.msgType) != 0 { // Check if this port is able to accept the type of message we're sending. - netconn.Conn.Write(msg.msg) + // Check if the client is in sleep mode. + if !netconn.sleepMode { // Write immediately. + netconn.Conn.Write(msg.msg) + } else if msg.queueable { // Queue the message if the message is "queueable". Discard otherwise. + if len(netconn.sleepQueue) >= maxUserMsgQueueSize { // Too many messages queued? Client has been asleep for too long. Drop the oldest. + log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port) + netconn.sleepQueue = netconn.sleepQueue[1:maxUserMsgQueueSize-1] + } + netconn.sleepQueue = append(netconn.sleepQueue, msg.msg) + outSockets[k] = netconn + } } } } @@ -102,7 +116,12 @@ func refreshConnectedClients() { log.Printf("DialUDP(%s): %s\n", ipAndPort, err.Error()) continue } - outSockets[ipAndPort] = networkConnection{outConn, ip, networkOutput.Port, networkOutput.Capability} + newq := make([][]byte, 0) + sleepMode := false + if networkOutput.Port == 4000 { // Start off FF in sleep mode until something is received. + sleepMode = true + } + outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, sleepMode: sleepMode, sleepQueue: newq} } validConnections[ipAndPort] = true } @@ -117,6 +136,28 @@ func refreshConnectedClients() { } } +func checkMessageQueues() { + netMutex.Lock() + defer netMutex.Unlock() + for k, netconn := range outSockets { + if len(netconn.sleepQueue) > 0 && !netconn.sleepMode { + // Empty the sleep queue. + tmpQueue := netconn.sleepQueue + tmpConn := netconn.Conn + go func() { + time.Sleep(5 * time.Second) // Give it time to start listening, some apps send the "woke up" message too quickly. + for _, msg := range tmpQueue { + tmpConn.Write(msg) + time.Sleep(50 * time.Millisecond) // Slow down the sending, 20/sec. + } + }() + netconn.sleepQueue = make([][]byte, 0) + outSockets[k] = netconn + log.Printf("%s - emptied %d in queue.\n", k, len(tmpQueue)) + } + } +} + func messageQueueSender() { secondTimer := time.NewTicker(5 * time.Second) for { @@ -125,16 +166,17 @@ func messageQueueSender() { sendToAllConnectedClients(msg) case <-secondTimer.C: getNetworkStats() + checkMessageQueues() } } } -func sendMsg(msg []byte, msgType uint8) { - messageQueue <- networkMessage{msg, msgType} +func sendMsg(msg []byte, msgType uint8, queueable bool) { + messageQueue <- networkMessage{msg: msg, msgType: msgType, queueable: queueable, ts: time.Now()} } -func sendGDL90(msg []byte) { - sendMsg(msg, NETWORK_GDL90_STANDARD) +func sendGDL90(msg []byte, queueable bool) { + sendMsg(msg, NETWORK_GDL90_STANDARD, queueable) } func monitorDHCPLeases() { @@ -148,6 +190,56 @@ func monitorDHCPLeases() { } } +// Monitor clients going in/out of sleep mode. This will be different for different apps. +func sleepMonitor() { + // FF sleep mode. + addr := net.UDPAddr{Port: 50113, IP: net.ParseIP("0.0.0.0")} + conn, err := net.ListenUDP("udp", &addr) + defer conn.Close() + if err != nil { + log.Printf("err: %s\n", err.Error()) + log.Printf("error listening on port 50113 (FF comm) - assuming FF is always awake (if connected).\n") + return + } + for { + buf := make([]byte, 1024) + n, addr, err := conn.ReadFrom(buf) + ipAndPort := strings.Split(addr.String(), ":") + ip := ipAndPort[0] + if err != nil { + log.Printf("err: %s\n", err.Error()) + return + } + // Got message, check if it's in the correct format. + if n < 3 || buf[0] != 0xFF || buf[1] != 0xFE { + continue + } + s := string(buf[2:n]) + s = strings.Replace(s, "\x00", "", -1) + ffIpAndPort := ip + ":4000" + netMutex.Lock() + p, ok := outSockets[ffIpAndPort] + if !ok { + // Can't do anything, the client isn't even technically connected. + netMutex.Unlock() + continue + } + if strings.HasPrefix(s, "i-want-to-play-ffm-udp") || strings.HasPrefix(s, "i-can-play-ffm-udp") { + if p.sleepMode { + log.Printf("%s - woke up\n", ffIpAndPort) + p.sleepMode = false + } + } else if strings.HasPrefix(s, "i-cannot-play-ffm-udp") { + if !p.sleepMode { + log.Printf("%s - went to sleep\n", ffIpAndPort) + p.sleepMode = true + } + } + outSockets[ffIpAndPort] = p + netMutex.Unlock() + } +} + func initNetwork() { messageQueue = make(chan networkMessage, 1024) // Buffered channel, 1024 messages. outSockets = make(map[string]networkConnection) @@ -155,4 +247,5 @@ func initNetwork() { refreshConnectedClients() go monitorDHCPLeases() go messageQueueSender() + go sleepMonitor() } diff --git a/main/ry835ai.go b/main/ry835ai.go index 2f70aa95..27c70df8 100644 --- a/main/ry835ai.go +++ b/main/ry835ai.go @@ -262,7 +262,7 @@ func tempAndPressureReader() { func makeFFAHRSSimReport() { s := fmt.Sprintf("XATTStratux,%f,%f,%f", mySituation.gyro_heading, mySituation.pitch, mySituation.roll) - sendMsg([]byte(s), NETWORK_AHRS_FFSIM) + sendMsg([]byte(s), NETWORK_AHRS_FFSIM, false) } func makeAHRSGDL90Report() { @@ -303,7 +303,7 @@ func makeAHRSGDL90Report() { msg[14] = byte((g >> 8) & 0xFF) msg[15] = byte(g & 0xFF) - sendMsg(prepareMessage(msg), NETWORK_AHRS_GDL90) + sendMsg(prepareMessage(msg), NETWORK_AHRS_GDL90, false) } func attitudeReaderSender() { diff --git a/main/traffic.go b/main/traffic.go index d271cfdc..a216ebeb 100644 --- a/main/traffic.go +++ b/main/traffic.go @@ -160,7 +160,7 @@ func makeTrafficReport(ti TrafficInfo) { //TODO: text identifier (tail). - sendGDL90(prepareMessage(msg)) + sendGDL90(prepareMessage(msg), false) } func parseDownlinkReport(s string) { diff --git a/notes/protocol.txt b/notes/protocol.txt new file mode 100644 index 00000000..67ac8306 --- /dev/null +++ b/notes/protocol.txt @@ -0,0 +1,10 @@ +Not queueable: + makeOwnshipReport() [each second] + makeOwnshipGeometricAltitudeReport() [each second] + makeHeartbeat() [each second] + makeInitializationMessage() [each second] + makeTrafficReport() [re-generated each second depending on target status] + makeFFAHRSSimReport() [short useful life] + makeAHRSGDL90Report() [short useful life] +Queueable: + relayMessage() [all uplink messages]