Stratux replay.

Introduce queueable messages. Introduce sleepMode state for connections. Implemented #20 for FF.
pull/29/head
Christopher Young 2015-09-01 16:16:31 -04:00
rodzic 6de59749f0
commit c38c384eda
5 zmienionych plików z 128 dodań i 22 usunięć

Wyświetl plik

@ -14,10 +14,12 @@ import (
// http://www.faa.gov/nextgen/programs/adsb/wsa/media/GDL90_Public_ICD_RevA.PDF
const (
stratuxVersion = "v0.2"
configLocation = "/etc/stratux.conf"
managementAddr = ":80"
maxDatagramSize = 8192
stratuxVersion = "v0.2"
configLocation = "/etc/stratux.conf"
managementAddr = ":80"
maxDatagramSize = 8192
maxUserMsgQueueSize = 2500 // About 1MB per port per connected client.
UPLINK_BLOCK_DATA_BITS = 576
UPLINK_BLOCK_BITS = (UPLINK_BLOCK_DATA_BITS + 160)
UPLINK_BLOCK_DATA_BYTES = (UPLINK_BLOCK_DATA_BITS / 8)
@ -191,7 +193,7 @@ func makeOwnshipReport() bool {
msg[18] = 0x01 // "Light (ICAO) < 15,500 lbs"
sendGDL90(prepareMessage(msg))
sendGDL90(prepareMessage(msg), false)
return true
}
@ -212,7 +214,7 @@ func makeOwnshipGeometricAltitudeReport() bool {
msg[3] = 0x00
msg[4] = 0x0A
sendGDL90(prepareMessage(msg))
sendGDL90(prepareMessage(msg), false)
return true
}
@ -263,18 +265,18 @@ func relayMessage(msgtype uint16, msg []byte) {
ret[i+4] = msg[i]
}
sendGDL90(prepareMessage(ret))
sendGDL90(prepareMessage(ret), true)
}
func heartBeatSender() {
timer := time.NewTicker(1 * time.Second)
for {
<-timer.C
sendGDL90(makeHeartbeat())
sendGDL90(makeHeartbeat(), false)
// sendGDL90(makeTrafficReport())
makeOwnshipReport()
makeOwnshipGeometricAltitudeReport()
sendGDL90(makeInitializationMessage())
sendGDL90(makeInitializationMessage(), false)
sendTrafficUpdates()
updateStatus()
}
@ -393,7 +395,8 @@ func defaultSettings() {
globalSettings.UAT_Enabled = true //TODO
globalSettings.ES_Enabled = false //TODO
globalSettings.GPS_Enabled = false //TODO
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90}, {nil, "", 49002, NETWORK_AHRS_FFSIM}}
//FIXME: Need to change format below.
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, false, nil}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, false, nil}, {nil, "", 49002, NETWORK_AHRS_FFSIM, false, nil}}
globalSettings.AHRS_Enabled = false
}

Wyświetl plik

@ -11,8 +11,10 @@ import (
)
type networkMessage struct {
msg []byte
msgType uint8
msg []byte
msgType uint8
queueable bool
ts time.Time
}
type networkConnection struct {
@ -20,6 +22,8 @@ type networkConnection struct {
Ip string
Port uint32
Capability uint8
sleepMode bool // Device is not able to receive messages currently.
sleepQueue [][]byte // Device message queue.
}
var messageQueue chan networkMessage
@ -61,9 +65,19 @@ func getDHCPLeases() (map[string]string, error) {
func sendToAllConnectedClients(msg networkMessage) {
netMutex.Lock()
defer netMutex.Unlock()
for _, netconn := range outSockets {
for k, netconn := range outSockets {
if (netconn.Capability & msg.msgType) != 0 { // Check if this port is able to accept the type of message we're sending.
netconn.Conn.Write(msg.msg)
// Check if the client is in sleep mode.
if !netconn.sleepMode { // Write immediately.
netconn.Conn.Write(msg.msg)
} else if msg.queueable { // Queue the message if the message is "queueable". Discard otherwise.
if len(netconn.sleepQueue) >= maxUserMsgQueueSize { // Too many messages queued? Client has been asleep for too long. Drop the oldest.
log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port)
netconn.sleepQueue = netconn.sleepQueue[1:maxUserMsgQueueSize-1]
}
netconn.sleepQueue = append(netconn.sleepQueue, msg.msg)
outSockets[k] = netconn
}
}
}
}
@ -102,7 +116,12 @@ func refreshConnectedClients() {
log.Printf("DialUDP(%s): %s\n", ipAndPort, err.Error())
continue
}
outSockets[ipAndPort] = networkConnection{outConn, ip, networkOutput.Port, networkOutput.Capability}
newq := make([][]byte, 0)
sleepMode := false
if networkOutput.Port == 4000 { // Start off FF in sleep mode until something is received.
sleepMode = true
}
outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, sleepMode: sleepMode, sleepQueue: newq}
}
validConnections[ipAndPort] = true
}
@ -117,6 +136,28 @@ func refreshConnectedClients() {
}
}
func checkMessageQueues() {
netMutex.Lock()
defer netMutex.Unlock()
for k, netconn := range outSockets {
if len(netconn.sleepQueue) > 0 && !netconn.sleepMode {
// Empty the sleep queue.
tmpQueue := netconn.sleepQueue
tmpConn := netconn.Conn
go func() {
time.Sleep(5 * time.Second) // Give it time to start listening, some apps send the "woke up" message too quickly.
for _, msg := range tmpQueue {
tmpConn.Write(msg)
time.Sleep(50 * time.Millisecond) // Slow down the sending, 20/sec.
}
}()
netconn.sleepQueue = make([][]byte, 0)
outSockets[k] = netconn
log.Printf("%s - emptied %d in queue.\n", k, len(tmpQueue))
}
}
}
func messageQueueSender() {
secondTimer := time.NewTicker(5 * time.Second)
for {
@ -125,16 +166,17 @@ func messageQueueSender() {
sendToAllConnectedClients(msg)
case <-secondTimer.C:
getNetworkStats()
checkMessageQueues()
}
}
}
func sendMsg(msg []byte, msgType uint8) {
messageQueue <- networkMessage{msg, msgType}
func sendMsg(msg []byte, msgType uint8, queueable bool) {
messageQueue <- networkMessage{msg: msg, msgType: msgType, queueable: queueable, ts: time.Now()}
}
func sendGDL90(msg []byte) {
sendMsg(msg, NETWORK_GDL90_STANDARD)
func sendGDL90(msg []byte, queueable bool) {
sendMsg(msg, NETWORK_GDL90_STANDARD, queueable)
}
func monitorDHCPLeases() {
@ -148,6 +190,56 @@ func monitorDHCPLeases() {
}
}
// Monitor clients going in/out of sleep mode. This will be different for different apps.
func sleepMonitor() {
// FF sleep mode.
addr := net.UDPAddr{Port: 50113, IP: net.ParseIP("0.0.0.0")}
conn, err := net.ListenUDP("udp", &addr)
defer conn.Close()
if err != nil {
log.Printf("err: %s\n", err.Error())
log.Printf("error listening on port 50113 (FF comm) - assuming FF is always awake (if connected).\n")
return
}
for {
buf := make([]byte, 1024)
n, addr, err := conn.ReadFrom(buf)
ipAndPort := strings.Split(addr.String(), ":")
ip := ipAndPort[0]
if err != nil {
log.Printf("err: %s\n", err.Error())
return
}
// Got message, check if it's in the correct format.
if n < 3 || buf[0] != 0xFF || buf[1] != 0xFE {
continue
}
s := string(buf[2:n])
s = strings.Replace(s, "\x00", "", -1)
ffIpAndPort := ip + ":4000"
netMutex.Lock()
p, ok := outSockets[ffIpAndPort]
if !ok {
// Can't do anything, the client isn't even technically connected.
netMutex.Unlock()
continue
}
if strings.HasPrefix(s, "i-want-to-play-ffm-udp") || strings.HasPrefix(s, "i-can-play-ffm-udp") {
if p.sleepMode {
log.Printf("%s - woke up\n", ffIpAndPort)
p.sleepMode = false
}
} else if strings.HasPrefix(s, "i-cannot-play-ffm-udp") {
if !p.sleepMode {
log.Printf("%s - went to sleep\n", ffIpAndPort)
p.sleepMode = true
}
}
outSockets[ffIpAndPort] = p
netMutex.Unlock()
}
}
func initNetwork() {
messageQueue = make(chan networkMessage, 1024) // Buffered channel, 1024 messages.
outSockets = make(map[string]networkConnection)
@ -155,4 +247,5 @@ func initNetwork() {
refreshConnectedClients()
go monitorDHCPLeases()
go messageQueueSender()
go sleepMonitor()
}

Wyświetl plik

@ -262,7 +262,7 @@ func tempAndPressureReader() {
func makeFFAHRSSimReport() {
s := fmt.Sprintf("XATTStratux,%f,%f,%f", mySituation.gyro_heading, mySituation.pitch, mySituation.roll)
sendMsg([]byte(s), NETWORK_AHRS_FFSIM)
sendMsg([]byte(s), NETWORK_AHRS_FFSIM, false)
}
func makeAHRSGDL90Report() {
@ -303,7 +303,7 @@ func makeAHRSGDL90Report() {
msg[14] = byte((g >> 8) & 0xFF)
msg[15] = byte(g & 0xFF)
sendMsg(prepareMessage(msg), NETWORK_AHRS_GDL90)
sendMsg(prepareMessage(msg), NETWORK_AHRS_GDL90, false)
}
func attitudeReaderSender() {

Wyświetl plik

@ -160,7 +160,7 @@ func makeTrafficReport(ti TrafficInfo) {
//TODO: text identifier (tail).
sendGDL90(prepareMessage(msg))
sendGDL90(prepareMessage(msg), false)
}
func parseDownlinkReport(s string) {

10
notes/protocol.txt 100644
Wyświetl plik

@ -0,0 +1,10 @@
Not queueable:
makeOwnshipReport() [each second]
makeOwnshipGeometricAltitudeReport() [each second]
makeHeartbeat() [each second]
makeInitializationMessage() [each second]
makeTrafficReport() [re-generated each second depending on target status]
makeFFAHRSSimReport() [short useful life]
makeAHRSGDL90Report() [short useful life]
Queueable:
relayMessage() [all uplink messages]