Redo sleep mode - now based on ICMP echo/unreachable packets.

pull/58/head
Christopher Young 2015-09-15 11:29:41 -04:00
rodzic 3a6a0b57ec
commit 5dda5d6220
2 zmienionych plików z 143 dodań i 79 usunięć

Wyświetl plik

@ -528,7 +528,7 @@ func defaultSettings() {
globalSettings.ES_Enabled = false //TODO
globalSettings.GPS_Enabled = false //TODO
//FIXME: Need to change format below.
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, false, nil}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, false, nil}, {nil, "", 49002, NETWORK_AHRS_FFSIM, false, nil}}
globalSettings.NetworkOutputs = []networkConnection{{nil, "", 4000, NETWORK_GDL90_STANDARD, nil, time.Time{}, time.Time{}}, {nil, "", 43211, NETWORK_GDL90_STANDARD | NETWORK_AHRS_GDL90, nil, time.Time{}, time.Time{}}, {nil, "", 49002, NETWORK_AHRS_FFSIM, nil, time.Time{}, time.Time{}}}
globalSettings.AHRS_Enabled = false
globalSettings.DEBUG = false
globalSettings.ReplayLog = false //TODO: 'true' for debug builds.

Wyświetl plik

@ -1,9 +1,14 @@
package main
import (
"golang.org/x/net/icmp"
"golang.org/x/net/internal/iana"
"golang.org/x/net/ipv4"
"io/ioutil"
"log"
"math/rand"
"net"
"os"
"strconv"
"strings"
"sync"
@ -18,12 +23,17 @@ type networkMessage struct {
}
type networkConnection struct {
Conn *net.UDPConn
Ip string
Port uint32
Capability uint8
sleepMode bool // Device is not able to receive messages currently.
sleepQueue [][]byte // Device message queue.
Conn *net.UDPConn
Ip string
Port uint32
Capability uint8
messageQueue [][]byte // Device message queue.
/*
Sleep mode/throttle variables. "sleep mode" is actually now just a very reduced packet rate, since we don't know positively
when a client is ready to accept packets - we just assume so if we don't receive ICMP Unreachable packets in 5 secs.
*/
lastUnreachable time.Time // Last time the device sent an ICMP Unreachable packet.
nextMessageTime time.Time // The next time that the device is "able" to receive a message.
}
var messageQueue chan networkMessage
@ -31,6 +41,8 @@ var outSockets map[string]networkConnection
var dhcpLeases map[string]string
var netMutex *sync.Mutex
var pingResponse map[string]time.Time // Last time an IP responded to an "echo" response.
const (
NETWORK_GDL90_STANDARD = 1
NETWORK_AHRS_FFSIM = 2
@ -65,22 +77,48 @@ func getDHCPLeases() (map[string]string, error) {
return ret, nil
}
func isSleeping(k string) bool {
ipAndPort := strings.Split(k, ":")
lastPing, ok := pingResponse[ipAndPort[0]]
// No ping response. Assume disconnected/sleeping device.
if !ok || time.Since(lastPing) > (10*time.Second) {
return true
}
if time.Since(outSockets[k].lastUnreachable) < (5 * time.Second) {
return true
}
return false
}
// Throttle mode for testing port open and giving some start-up time to the app.
// Throttling is 0.1% data rate for first 15 seconds.
func isThrottled(k string) bool {
return (rand.Int()%1000 != 0) && time.Since(outSockets[k].lastUnreachable) < (15*time.Second)
}
func sendToAllConnectedClients(msg networkMessage) {
netMutex.Lock()
defer netMutex.Unlock()
for k, netconn := range outSockets {
if (netconn.Capability & msg.msgType) != 0 { // Check if this port is able to accept the type of message we're sending.
// Check if the client is in sleep mode.
if !netconn.sleepMode { // Write immediately.
netconn.Conn.Write(msg.msg)
} else if msg.queueable { // Queue the message if the message is "queueable". Discard otherwise.
if len(netconn.sleepQueue) >= maxUserMsgQueueSize { // Too many messages queued? Client has been asleep for too long. Drop the oldest.
log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port)
netconn.sleepQueue = netconn.sleepQueue[1:maxUserMsgQueueSize-1]
}
netconn.sleepQueue = append(netconn.sleepQueue, msg.msg)
outSockets[k] = netconn
// Check if this port is able to accept the type of message we're sending.
if (netconn.Capability & msg.msgType) == 0 {
continue
}
// Send non-queueable messages immediately, or discard if the client is in sleep mode.
if !msg.queueable {
if !isSleeping(k) {
netconn.Conn.Write(msg.msg) // Write immediately.
} else {
log.Printf("sleepy %s\n", k)
}
} else {
// Queue the message if the message is "queueable".
if len(netconn.messageQueue) >= maxUserMsgQueueSize { // Too many messages queued? Drop the oldest.
log.Printf("%s:%d - message queue overflow.\n", netconn.Ip, netconn.Port)
netconn.messageQueue = netconn.messageQueue[1 : maxUserMsgQueueSize-1]
}
netconn.messageQueue = append(netconn.messageQueue, msg.msg)
outSockets[k] = netconn
}
}
}
@ -120,12 +158,7 @@ func refreshConnectedClients() {
continue
}
newq := make([][]byte, 0)
sleepMode := false
/* if networkOutput.Port == 4000 { // Start off FF in sleep mode until something is received.
sleepMode = true
}
*/
outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, sleepMode: sleepMode, sleepQueue: newq}
outSockets[ipAndPort] = networkConnection{Conn: outConn, Ip: ip, Port: networkOutput.Port, Capability: networkOutput.Capability, messageQueue: newq}
}
validConnections[ipAndPort] = true
}
@ -140,37 +173,26 @@ func refreshConnectedClients() {
}
}
func checkMessageQueues() {
netMutex.Lock()
defer netMutex.Unlock()
for k, netconn := range outSockets {
if len(netconn.sleepQueue) > 0 && !netconn.sleepMode {
// Empty the sleep queue.
tmpQueue := netconn.sleepQueue
tmpConn := netconn.Conn
go func() {
time.Sleep(5 * time.Second) // Give it time to start listening, some apps send the "woke up" message too quickly.
for _, msg := range tmpQueue {
tmpConn.Write(msg)
time.Sleep(5 * time.Millisecond) // Slow down the sending, 200/sec.
}
}()
netconn.sleepQueue = make([][]byte, 0)
outSockets[k] = netconn
log.Printf("%s - emptied %d in queue.\n", k, len(tmpQueue))
}
}
}
func messageQueueSender() {
secondTimer := time.NewTicker(5 * time.Second)
queueTimer := time.NewTicker(400 * time.Microsecond) // 2500 msg/sec
for {
select {
case msg := <-messageQueue:
sendToAllConnectedClients(msg)
case <-queueTimer.C:
netMutex.Lock()
for k, netconn := range outSockets {
if len(netconn.messageQueue) > 0 && !isSleeping(k) && !isThrottled(k) {
tmpConn := netconn
tmpConn.Conn.Write(tmpConn.messageQueue[0])
tmpConn.messageQueue = tmpConn.messageQueue[1:]
outSockets[k] = tmpConn
}
}
netMutex.Unlock()
case <-secondTimer.C:
getNetworkStats()
checkMessageQueues()
}
}
}
@ -194,52 +216,93 @@ func monitorDHCPLeases() {
}
}
// Monitor clients going in/out of sleep mode. This will be different for different apps.
func icmpEchoSender(c *icmp.PacketConn) {
timer := time.NewTicker(5 * time.Second)
for {
<-timer.C
// Collect IPs.
ips := make(map[string]bool)
for k, _ := range outSockets {
ipAndPort := strings.Split(k, ":")
ips[ipAndPort[0]] = true
}
// Send to all IPs.
for ip, _ := range ips {
wm := icmp.Message{
Type: ipv4.ICMPTypeEcho, Code: 0,
Body: &icmp.Echo{
ID: os.Getpid() & 0xffff, Seq: 1,
Data: []byte("STRATUX"),
},
}
wb, err := wm.Marshal(nil)
if err != nil {
log.Printf("couldn't send ICMP Echo: %s\n", err.Error())
continue
}
if _, err := c.WriteTo(wb, &net.IPAddr{IP: net.ParseIP(ip)}); err != nil {
log.Printf("couldn't send ICMP Echo: %s\n", err.Error())
continue
}
}
}
}
// Monitor clients going in/out of sleep mode via ICMP unreachable packets.
func sleepMonitor() {
// FF sleep mode.
addr := net.UDPAddr{Port: 50113, IP: net.ParseIP("0.0.0.0")}
conn, err := net.ListenUDP("udp", &addr)
c, err := icmp.ListenPacket("ip4:icmp", "0.0.0.0")
if err != nil {
log.Printf("err: %s\n", err.Error())
log.Printf("error listening on port 50113 (FF comm) - assuming FF is always awake (if connected).\n")
log.Printf("error listening for udp - sending data to all ports for all connected clients. err: %s", err)
return
}
defer conn.Close()
go icmpEchoSender(c)
defer c.Close()
for {
buf := make([]byte, 1024)
n, addr, err := conn.ReadFrom(buf)
ipAndPort := strings.Split(addr.String(), ":")
ip := ipAndPort[0]
buf := make([]byte, 1500)
n, peer, err := c.ReadFrom(buf)
if err != nil {
log.Printf("err: %s\n", err.Error())
return
}
// Got message, check if it's in the correct format.
if n < 3 || buf[0] != 0xFF || buf[1] != 0xFE {
log.Printf("%s\n", err.Error())
continue
}
s := string(buf[2:n])
s = strings.Replace(s, "\x00", "", -1)
ffIpAndPort := ip + ":4000"
msg, err := icmp.ParseMessage(iana.ProtocolICMP, buf[:n])
if err != nil {
continue
}
ip := peer.String()
// Look for echo replies, mark it as received.
if msg.Type == ipv4.ICMPTypeEchoReply {
pingResponse[ip] = time.Now()
continue // No further processing needed.
}
// Only deal with ICMP Unreachable packets (since that's what iOS and Android seem to be sending whenever the apps are not available).
if msg.Type != ipv4.ICMPTypeDestinationUnreachable {
continue
}
// Packet parsing.
mb, err := msg.Body.Marshal(iana.ProtocolICMP)
if err != nil {
continue
}
if len(mb) < 28 {
continue
}
// The unreachable port.
port := (uint16(mb[26]) << 8) | uint16(mb[27])
ipAndPort := ip + ":" + strconv.Itoa(int(port))
netMutex.Lock()
p, ok := outSockets[ffIpAndPort]
p, ok := outSockets[ipAndPort]
if !ok {
// Can't do anything, the client isn't even technically connected.
netMutex.Unlock()
continue
}
if strings.HasPrefix(s, "i-want-to-play-ffm-udp") || strings.HasPrefix(s, "i-can-play-ffm-udp") {
if p.sleepMode {
log.Printf("%s - woke up\n", ffIpAndPort)
p.sleepMode = false
}
} else if strings.HasPrefix(s, "i-cannot-play-ffm-udp") {
if !p.sleepMode {
log.Printf("%s - went to sleep\n", ffIpAndPort)
p.sleepMode = true
}
}
outSockets[ffIpAndPort] = p
p.lastUnreachable = time.Now()
outSockets[ipAndPort] = p
netMutex.Unlock()
}
}
@ -247,6 +310,7 @@ func sleepMonitor() {
func initNetwork() {
messageQueue = make(chan networkMessage, 1024) // Buffered channel, 1024 messages.
outSockets = make(map[string]networkConnection)
pingResponse = make(map[string]time.Time)
netMutex = &sync.Mutex{}
refreshConnectedClients()
go monitorDHCPLeases()