diff --git a/main/gen_gdl90.go b/main/gen_gdl90.go index ee069acf..6fa29e97 100644 --- a/main/gen_gdl90.go +++ b/main/gen_gdl90.go @@ -528,7 +528,7 @@ func defaultSettings() { globalSettings.ES_Enabled = false //TODO globalSettings.GPS_Enabled = false //TODO //FIXME: Need to change format below. - globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, false, nil}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, false, nil}, {nil, "", 49002, NETWORK_AHRS_FFSIM, false, nil}} + globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, nil, time.Time{}, time.Time{}}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, nil, time.Time{}, time.Time{}}, {nil, "", 49002, NETWORK_AHRS_FFSIM, nil, time.Time{}, time.Time{}}} globalSettings.AHRS_Enabled = false globalSettings.DEBUG = false globalSettings.ReplayLog = false //TODO: 'true' for debug builds. diff --git a/main/network.go b/main/network.go index 06240f7e..c7ddba3f 100644 --- a/main/network.go +++ b/main/network.go @@ -1,9 +1,14 @@ package main import ( + "golang.org/x/net/icmp" + "golang.org/x/net/internal/iana" + "golang.org/x/net/ipv4" "io/ioutil" "log" + "math/rand" "net" + "os" "strconv" "strings" "sync" @@ -18,12 +23,17 @@ type networkMessage struct { } type networkConnection struct { - Conn *net.UDPConn - Ip string - Port uint32 - Capability uint8 - sleepMode bool // Device is not able to receive messages currently. - sleepQueue [][]byte // Device message queue. + Conn *net.UDPConn + Ip string + Port uint32 + Capability uint8 + messageQueue [][]byte // Device message queue. + /* + Sleep mode/throttle variables. "sleep mode" is actually now just a very reduced packet rate, since we don't know positively + when a client is ready to accept packets - we just assume so if we don't receive ICMP Unreachable packets in 5 secs. + */ + lastUnreachable time.Time // Last time the device sent an ICMP Unreachable packet. + nextMessageTime time.Time // The next time that the device is "able" to receive a message. } var messageQueue chan networkMessage @@ -31,6 +41,8 @@ var outSockets map[string]networkConnection var dhcpLeases map[string]string var netMutex *sync.Mutex +var pingResponse map[string]time.Time // Last time an IP responded to an "echo" response. + const ( NETWORK_GDL90_STANDARD = 1 NETWORK_AHRS_FFSIM = 2 @@ -65,22 +77,48 @@ func getDHCPLeases() (map[string]string, error) { return ret, nil } +func isSleeping(k string) bool { + ipAndPort := strings.Split(k, ":") + lastPing, ok := pingResponse[ipAndPort[0]] + // No ping response. Assume disconnected/sleeping device. + if !ok || time.Since(lastPing) > (10*time.Second) { + return true + } + if time.Since(outSockets[k].lastUnreachable) < (5 * time.Second) { + return true + } + return false +} + +// Throttle mode for testing port open and giving some start-up time to the app. +// Throttling is 0.1% data rate for first 15 seconds. +func isThrottled(k string) bool { + return (rand.Int()%1000 != 0) && time.Since(outSockets[k].lastUnreachable) < (15*time.Second) +} + func sendToAllConnectedClients(msg networkMessage) { netMutex.Lock() defer netMutex.Unlock() for k, netconn := range outSockets { - if (netconn.Capability & msg.msgType) != 0 { // Check if this port is able to accept the type of message we're sending. - // Check if the client is in sleep mode. - if !netconn.sleepMode { // Write immediately. - netconn.Conn.Write(msg.msg) - } else if msg.queueable { // Queue the message if the message is "queueable". Discard otherwise. - if len(netconn.sleepQueue) >= maxUserMsgQueueSize { // Too many messages queued? Client has been asleep for too long. Drop the oldest. - log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port) - netconn.sleepQueue = netconn.sleepQueue[1:maxUserMsgQueueSize-1] - } - netconn.sleepQueue = append(netconn.sleepQueue, msg.msg) - outSockets[k] = netconn + // Check if this port is able to accept the type of message we're sending. + if (netconn.Capability & msg.msgType) == 0 { + continue + } + // Send non-queueable messages immediately, or discard if the client is in sleep mode. + if !msg.queueable { + if !isSleeping(k) { + netconn.Conn.Write(msg.msg) // Write immediately. + } else { + log.Printf("sleepy %s\n", k) } + } else { + // Queue the message if the message is "queueable". + if len(netconn.messageQueue) >= maxUserMsgQueueSize { // Too many messages queued? Drop the oldest. + log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port) + netconn.messageQueue = netconn.messageQueue[1 : maxUserMsgQueueSize-1] + } + netconn.messageQueue = append(netconn.messageQueue, msg.msg) + outSockets[k] = netconn } } } @@ -120,12 +158,7 @@ func refreshConnectedClients() { continue } newq := make([][]byte, 0) - sleepMode := false -/* if networkOutput.Port == 4000 { // Start off FF in sleep mode until something is received. - sleepMode = true - } -*/ - outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, sleepMode: sleepMode, sleepQueue: newq} + outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, messageQueue: newq} } validConnections[ipAndPort] = true } @@ -140,37 +173,26 @@ func refreshConnectedClients() { } } -func checkMessageQueues() { - netMutex.Lock() - defer netMutex.Unlock() - for k, netconn := range outSockets { - if len(netconn.sleepQueue) > 0 && !netconn.sleepMode { - // Empty the sleep queue. - tmpQueue := netconn.sleepQueue - tmpConn := netconn.Conn - go func() { - time.Sleep(5 * time.Second) // Give it time to start listening, some apps send the "woke up" message too quickly. - for _, msg := range tmpQueue { - tmpConn.Write(msg) - time.Sleep(5 * time.Millisecond) // Slow down the sending, 200/sec. - } - }() - netconn.sleepQueue = make([][]byte, 0) - outSockets[k] = netconn - log.Printf("%s - emptied %d in queue.\n", k, len(tmpQueue)) - } - } -} - func messageQueueSender() { secondTimer := time.NewTicker(5 * time.Second) + queueTimer := time.NewTicker(400 * time.Microsecond) // 2500 msg/sec for { select { case msg := <-messageQueue: sendToAllConnectedClients(msg) + case <-queueTimer.C: + netMutex.Lock() + for k, netconn := range outSockets { + if len(netconn.messageQueue) > 0 && !isSleeping(k) && !isThrottled(k) { + tmpConn := netconn + tmpConn.Conn.Write(tmpConn.messageQueue[0]) + tmpConn.messageQueue = tmpConn.messageQueue[1:] + outSockets[k] = tmpConn + } + } + netMutex.Unlock() case <-secondTimer.C: getNetworkStats() - checkMessageQueues() } } } @@ -194,52 +216,93 @@ func monitorDHCPLeases() { } } -// Monitor clients going in/out of sleep mode. This will be different for different apps. +func icmpEchoSender(c *icmp.PacketConn) { + timer := time.NewTicker(5 * time.Second) + for { + <-timer.C + // Collect IPs. + ips := make(map[string]bool) + for k, _ := range outSockets { + ipAndPort := strings.Split(k, ":") + ips[ipAndPort[0]] = true + } + // Send to all IPs. + for ip, _ := range ips { + wm := icmp.Message{ + Type: ipv4.ICMPTypeEcho, Code: 0, + Body: &icmp.Echo{ + ID: os.Getpid() & 0xffff, Seq: 1, + Data: []byte("STRATUX"), + }, + } + wb, err := wm.Marshal(nil) + if err != nil { + log.Printf("couldn't send ICMP Echo: %s\n", err.Error()) + continue + } + if _, err := c.WriteTo(wb, &net.IPAddr{IP: net.ParseIP(ip)}); err != nil { + log.Printf("couldn't send ICMP Echo: %s\n", err.Error()) + continue + } + } + } +} + +// Monitor clients going in/out of sleep mode via ICMP unreachable packets. func sleepMonitor() { - // FF sleep mode. - addr := net.UDPAddr{Port: 50113, IP: net.ParseIP("0.0.0.0")} - conn, err := net.ListenUDP("udp", &addr) + c, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0") if err != nil { - log.Printf("err: %s\n", err.Error()) - log.Printf("error listening on port 50113 (FF comm) - assuming FF is always awake (if connected).\n") + log.Printf("error listening for udp - sending data to all ports for all connected clients. err: %s", err) return } - defer conn.Close() + go icmpEchoSender(c) + defer c.Close() for { - buf := make([]byte, 1024) - n, addr, err := conn.ReadFrom(buf) - ipAndPort := strings.Split(addr.String(), ":") - ip := ipAndPort[0] + buf := make([]byte, 1500) + n, peer, err := c.ReadFrom(buf) if err != nil { - log.Printf("err: %s\n", err.Error()) - return - } - // Got message, check if it's in the correct format. - if n < 3 || buf[0] != 0xFF || buf[1] != 0xFE { + log.Printf("%s\n", err.Error()) continue } - s := string(buf[2:n]) - s = strings.Replace(s, "\x00", "", -1) - ffIpAndPort := ip + ":4000" + msg, err := icmp.ParseMessage(iana.ProtocolICMP, buf[:n]) + if err != nil { + continue + } + + ip := peer.String() + + // Look for echo replies, mark it as received. + if msg.Type == ipv4.ICMPTypeEchoReply { + pingResponse[ip] = time.Now() + continue // No further processing needed. + } + + // Only deal with ICMP Unreachable packets (since that's what iOS and Android seem to be sending whenever the apps are not available). + if msg.Type != ipv4.ICMPTypeDestinationUnreachable { + continue + } + // Packet parsing. + mb, err := msg.Body.Marshal(iana.ProtocolICMP) + if err != nil { + continue + } + if len(mb) < 28 { + continue + } + + // The unreachable port. + port := (uint16(mb[26]) << 8) | uint16(mb[27]) + ipAndPort := ip + ":" + strconv.Itoa(int(port)) + netMutex.Lock() - p, ok := outSockets[ffIpAndPort] + p, ok := outSockets[ipAndPort] if !ok { // Can't do anything, the client isn't even technically connected. netMutex.Unlock() continue } - if strings.HasPrefix(s, "i-want-to-play-ffm-udp") || strings.HasPrefix(s, "i-can-play-ffm-udp") { - if p.sleepMode { - log.Printf("%s - woke up\n", ffIpAndPort) - p.sleepMode = false - } - } else if strings.HasPrefix(s, "i-cannot-play-ffm-udp") { - if !p.sleepMode { - log.Printf("%s - went to sleep\n", ffIpAndPort) - p.sleepMode = true - } - } - outSockets[ffIpAndPort] = p + p.lastUnreachable = time.Now() + outSockets[ipAndPort] = p netMutex.Unlock() } } @@ -247,6 +310,7 @@ func sleepMonitor() { func initNetwork() { messageQueue = make(chan networkMessage, 1024) // Buffered channel, 1024 messages. outSockets = make(map[string]networkConnection) + pingResponse = make(map[string]time.Time) netMutex = &sync.Mutex{} refreshConnectedClients() go monitorDHCPLeases()