kopia lustrzana https://github.com/cyoung/stratux
Save some cycles when message queues aren't that large.
rodzic
94bfd8304e
commit
c4c427ddb2
|
@ -5,6 +5,7 @@ import (
|
||||||
"golang.org/x/net/ipv4"
|
"golang.org/x/net/ipv4"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"log"
|
"log"
|
||||||
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
"os"
|
"os"
|
||||||
|
@ -185,15 +186,23 @@ func refreshConnectedClients() {
|
||||||
|
|
||||||
func messageQueueSender() {
|
func messageQueueSender() {
|
||||||
secondTimer := time.NewTicker(5 * time.Second)
|
secondTimer := time.NewTicker(5 * time.Second)
|
||||||
queueTimer := time.NewTicker(400 * time.Microsecond) // 2500 msg/sec
|
|
||||||
|
queueTimer := time.NewTicker(1 * time.Second)
|
||||||
|
// queueTimer := time.NewTicker(400 * time.Microsecond) // 2500 msg/sec
|
||||||
|
|
||||||
|
var lastQueueTimeChange time.Time // Reevaluate send frequency every 5 seconds.
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case msg := <-messageQueue:
|
case msg := <-messageQueue:
|
||||||
sendToAllConnectedClients(msg)
|
sendToAllConnectedClients(msg)
|
||||||
case <-queueTimer.C:
|
case <-queueTimer.C:
|
||||||
netMutex.Lock()
|
netMutex.Lock()
|
||||||
|
|
||||||
|
averageSendableQueueSize := float64(0.0)
|
||||||
for k, netconn := range outSockets {
|
for k, netconn := range outSockets {
|
||||||
if len(netconn.messageQueue) > 0 && !isSleeping(k) && !isThrottled(k) {
|
if len(netconn.messageQueue) > 0 && !isSleeping(k) && !isThrottled(k) {
|
||||||
|
averageSendableQueueSize += float64(len(netconn.messageQueue)) // Add num sendable messages.
|
||||||
|
|
||||||
tmpConn := netconn
|
tmpConn := netconn
|
||||||
tmpConn.Conn.Write(tmpConn.messageQueue[0])
|
tmpConn.Conn.Write(tmpConn.messageQueue[0])
|
||||||
totalNetworkMessagesSent++
|
totalNetworkMessagesSent++
|
||||||
|
@ -201,6 +210,19 @@ func messageQueueSender() {
|
||||||
outSockets[k] = tmpConn
|
outSockets[k] = tmpConn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if time.Since(lastQueueTimeChange) >= 5 * time.Second {
|
||||||
|
var pd float64
|
||||||
|
if averageSendableQueueSize > 0.0 && len(outSockets) > 0 {
|
||||||
|
averageSendableQueueSize = averageSendableQueueSize / float64(len(outSockets)) // It's a total, not an average, up until this point.
|
||||||
|
pd = math.Max(float64(1.0/2500.0), float64(1.0/(4.0*averageSendableQueueSize))) // Say 250msg is enough to get through the whole queue.
|
||||||
|
} else {
|
||||||
|
pd = float64(1.0/2500.0)
|
||||||
|
}
|
||||||
|
queueTimer.Stop()
|
||||||
|
queueTimer = time.NewTicker(int64(pd * 1000000000.0) * time.Nanosecond)
|
||||||
|
lastQueueTimeChange = time.Now()
|
||||||
|
}
|
||||||
|
averageSendableQueueSize
|
||||||
netMutex.Unlock()
|
netMutex.Unlock()
|
||||||
case <-secondTimer.C:
|
case <-secondTimer.C:
|
||||||
getNetworkStats()
|
getNetworkStats()
|
||||||
|
|
Ładowanie…
Reference in New Issue