Batch data writes.

This will be done as part of a system where the database file is
compressed and moved periodically.
pull/360/head
Christopher Young 2016-04-01 08:26:37 -04:00
rodzic 4e5513ce95
commit b87aa52044
1 zmienionych plików z 53 dodań i 23 usunięć

Wyświetl plik

@ -27,14 +27,14 @@ const (
)
type StratuxTimestamp struct {
id int64
Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
StratuxClock_value time.Time
GPSClock_value time.Time
PreferredTime_value time.Time
}
var dataLogTimestamp StratuxTimestamp // Current timestamp bucket.
var dataLogTimestamps map[int64]StratuxTimestamp
var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
/*
checkTimestamp().
@ -42,15 +42,17 @@ var dataLogTimestamp StratuxTimestamp // Current timestamp bucket.
Returns false if the timestamp was changed, true if it is still valid.
*/
//FIXME: time -> stratuxClock
func checkTimestamp() bool {
if stratuxClock.Since(dataLogTimestamp.StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
if stratuxClock.Since(dataLogTimestamps[dataLogCurTimestamp].StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
//FIXME: mutex.
dataLogTimestamp.id = 0
dataLogTimestamp.Time_type_preference = 0 // stratuxClock.
dataLogTimestamp.StratuxClock_value = stratuxClock.Time
dataLogTimestamp.GPSClock_value = time.Time{}
dataLogTimestamp.PreferredTime_value = stratuxClock.Time
var ts StratuxTimestamp
ts.id = 0
ts.Time_type_preference = 0 // stratuxClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = time.Time{}
ts.PreferredTime_value = stratuxClock.Time
dataLogCurTimestamp++
dataLogTimestamps[dataLogCurTimestamp] = ts
return false
}
@ -182,7 +184,7 @@ func makeTable(i interface{}, tbl string, db *sql.DB) {
}
}
func insertData(i interface{}, tbl string, db *sql.DB) int64 {
func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
val := reflect.ValueOf(i)
keys := make([]string, 0)
@ -205,7 +207,13 @@ func insertData(i interface{}, tbl string, db *sql.DB) int64 {
// Add the timestamp_id field to link up with the timestamp table.
if tbl != "timestamp" {
keys = append(keys, "timestamp_id")
values = append(values, strconv.FormatInt(dataLogTimestamp.id, 10))
if dataLogTimestamps[ts_num].id == 0 {
//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
// (dataLogTimestamps[ts_num].id = id).
insertData(dataLogTimestamps[ts_num], "timestamp", db, ts_num) // Updates dataLogTimestamps[ts_num].id.
}
values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
}
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
@ -222,7 +230,7 @@ func insertData(i interface{}, tbl string, db *sql.DB) int64 {
id, err := res.LastInsertId()
if err == nil {
if tbl == "timestamp" {
dataLogTimestamp.id = id
dataLogTimestamps[ts_num].id = id
}
return id
}
@ -231,8 +239,9 @@ func insertData(i interface{}, tbl string, db *sql.DB) int64 {
}
type DataLogRow struct {
tbl string
data interface{}
tbl string
data interface{}
ts_num int64
}
var dataLogChan chan DataLogRow
@ -241,6 +250,7 @@ var shutdownDataLog chan bool
func dataLogWriter() {
dataLogChan = make(chan DataLogRow, 10240)
shutdownDataLog = make(chan bool)
dataLogTimestamps = make(map[int64]StratuxTimestamp, 0)
// Check if we need to create a new database.
createDatabase := false
@ -258,7 +268,7 @@ func dataLogWriter() {
// Do we need to create the database?
if createDatabase {
makeTable(dataLogTimestamp, "timestamp", db)
makeTable(StratuxTimestamp{}, "timestamp", db)
makeTable(mySituation, "mySituation", db)
makeTable(globalStatus, "status", db)
makeTable(globalSettings, "settings", db)
@ -267,30 +277,50 @@ func dataLogWriter() {
makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
}
// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
// When writeTicker comes up, the queue is emptied.
rowsQueuedForWrite := make([]DataLogRow, 0)
writeTicker := time.NewTicker(5 * time.Second)
for {
select {
//FIXME: measure latency from here to end of block. Messages may need to be timestamped *before* executing everything here.
case r := <-dataLogChan:
// Check if our time bucket has expired or has never been entered.
if !checkTimestamp() || dataLogTimestamp.id == 0 {
insertData(dataLogTimestamp, "timestamp", db) // Updates dataLogTimestamp.id.
checkTimestamp()
// Mark the row with the current timestamp ID, in case it gets entered later.
r.ts_num = dataLogCurTimestamp
// Queue it for the scheduled write.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
for _, r := range rowsQueuedForWrite {
insertData(r.data, r.tbl, db, r.ts_num)
}
insertData(r.data, r.tbl, db)
rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
case <-shutdownDataLog: // Received a message on the channel (anything). Graceful shutdown (defer statement).
return
}
}
}
/*
setDataLogTimeWithGPS().
Create a timestamp entry using GPS time.
*/
func setDataLogTimeWithGPS(sit SituationData) {
if isGPSClockValid() {
//FIXME: mutex.
var ts StratuxTimestamp
// Piggyback a GPS time update from this update.
dataLogTimestamp.id = 0
dataLogTimestamp.Time_type_preference = 1 // gpsClock.
dataLogTimestamp.StratuxClock_value = stratuxClock.Time
dataLogTimestamp.GPSClock_value = sit.GPSTime
dataLogTimestamp.PreferredTime_value = sit.GPSTime
ts.id = 0
ts.Time_type_preference = 1 // gpsClock.
ts.StratuxClock_value = stratuxClock.Time
ts.GPSClock_value = sit.GPSTime
ts.PreferredTime_value = sit.GPSTime
dataLogCurTimestamp++
dataLogTimestamps[dataLogCurTimestamp] = ts
}
}