Move db writes to a separate goroutine.

pull/360/head
Christopher Young 2016-04-01 14:05:23 -04:00
rodzic 044a95d801
commit dbacc6b6e7
1 zmienionych plików z 31 dodań i 16 usunięć

Wyświetl plik

@ -30,7 +30,7 @@ type StratuxTimestamp struct {
id int64
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time.Time
GPSClock_value time.Time
GPSClock_value time.Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
PreferredTime_value time.Time
}
@ -66,6 +66,7 @@ func checkTimestamp() bool {
// Re-set the preferred timestamp type to '2' (extrapolated time).
ts.Time_type_preference = 2
ts.PreferredTime_value = extrapolatedGPSTimestamp
ts.GPSClock_value = extrapolatedGPSTimestamp
}
}
@ -267,7 +268,30 @@ type DataLogRow struct {
var dataLogChan chan DataLogRow
var shutdownDataLog chan bool
func dataLogWriter() {
var dataLogWriteChan chan DataLogRow
func dataLogWriter(db *sql.DB) {
dataLogWriteChan = make(chan DataLogRow, 10240)
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
writeTicker := time.NewTicker(5 * time.Second)
rowsQueuedForWrite := make([]DataLogRow, 0)
for {
select {
case r := <-dataLogWriteChan:
// Accept timestamped row.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
// Write the buffered rows. This will block occasionally.
for _, r := range rowsQueuedForWrite {
insertData(r.data, r.tbl, db, r.ts_num)
}
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
}
}
}
func dataLog() {
dataLogChan = make(chan DataLogRow, 10240)
shutdownDataLog = make(chan bool)
dataLogTimestamps = make(map[int64]StratuxTimestamp, 0)
@ -286,6 +310,8 @@ func dataLogWriter() {
}
defer db.Close()
go dataLogWriter(db)
// Do we need to create the database?
if createDatabase {
makeTable(StratuxTimestamp{}, "timestamp", db)
@ -297,27 +323,16 @@ func dataLogWriter() {
makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
}
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
rowsQueuedForWrite := make([]DataLogRow, 0)
writeTicker := time.NewTicker(5 * time.Second)
for {
select {
//FIXME: measure latency from here to end of block. Messages may need to be timestamped *before* executing everything here.
case r := <-dataLogChan:
// When data is input, the first step is to timestamp it.
// Check if our time bucket has expired or has never been entered.
checkTimestamp()
// Mark the row with the current timestamp ID, in case it gets entered later.
r.ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
for _, r := range rowsQueuedForWrite {
insertData(r.data, r.tbl, db, r.ts_num)
}
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
dataLogWriteChan <- r
case <-shutdownDataLog: // Received a message on the channel (anything). Graceful shutdown (defer statement).
return
}
@ -375,5 +390,5 @@ func logDump1090TermMessage(m Dump1090TermMessage) {
}
func initDataLog() {
go dataLogWriter()
go dataLog()
}