kopia lustrzana https://github.com/cyoung/stratux
				
				
				
			
		
			
				
	
	
		
			636 wiersze
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
			
		
		
	
	
			636 wiersze
		
	
	
		
			21 KiB
		
	
	
	
		
			Go
		
	
	
| /*
 | |
| 	Copyright (c) 2015-2016 Christopher Young
 | |
| 	Distributable under the terms of The "BSD New"" License
 | |
| 	that can be found in the LICENSE file, herein included
 | |
| 	as part of this header.
 | |
| 
 | |
| 	datalog.go: Log stratux data as it is received. Bucket data into timestamp time slots.
 | |
| 
 | |
| */
 | |
| 
 | |
| package main
 | |
| 
 | |
| import (
 | |
| 	"database/sql"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	_ "github.com/mattn/go-sqlite3"
 | |
| 	"log"
 | |
| 	"os"
 | |
| 	"reflect"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| const (
 | |
| 	LOG_TIMESTAMP_RESOLUTION = 250 * time.Millisecond
 | |
| )
 | |
| 
 | |
| type StratuxTimestamp struct {
 | |
| 	id                   int64
 | |
| 	Time_type_preference int // 0 = stratuxClock, 1 = gpsClock, 2 = gpsClock extrapolated via stratuxClock.
 | |
| 	StratuxClock_value   time.Time
 | |
| 	GPSClock_value       time.Time // The value of this is either from the GPS or extrapolated from the GPS via stratuxClock if pref is 1 or 2. It is time.Time{} if 0.
 | |
| 	PreferredTime_value  time.Time
 | |
| 	StartupID            int64
 | |
| }
 | |
| 
 | |
| // 'startup' table creates a new entry each time the daemon is started. This keeps track of sequential starts, even if the
 | |
| //  timestamp is ambiguous (units with no GPS). This struct is just a placeholder for an empty table (other than primary key).
 | |
| type StratuxStartup struct {
 | |
| 	id   int64
 | |
| 	Fill string
 | |
| }
 | |
| 
 | |
| var dataLogStarted bool
 | |
| var dataLogReadyToWrite bool
 | |
| 
 | |
| var stratuxStartupID int64
 | |
| var dataLogTimestamps []StratuxTimestamp
 | |
| var dataLogCurTimestamp int64 // Current timestamp bucket. This is an index on dataLogTimestamps which is not necessarily the db id.
 | |
| 
 | |
| /*
 | |
| 	checkTimestamp().
 | |
| 		Verify that our current timestamp is within the LOG_TIMESTAMP_RESOLUTION bucket.
 | |
| 		 Returns false if the timestamp was changed, true if it is still valid.
 | |
| 		 This is where GPS timestamps are extrapolated, if the GPS data is currently valid.
 | |
| */
 | |
| 
 | |
| func checkTimestamp() bool {
 | |
| 	thisCurTimestamp := dataLogCurTimestamp
 | |
| 	if stratuxClock.Since(dataLogTimestamps[thisCurTimestamp].StratuxClock_value) >= LOG_TIMESTAMP_RESOLUTION {
 | |
| 		var ts StratuxTimestamp
 | |
| 		ts.id = 0
 | |
| 		ts.Time_type_preference = 0 // stratuxClock.
 | |
| 		ts.StratuxClock_value = stratuxClock.Time
 | |
| 		ts.GPSClock_value = time.Time{}
 | |
| 		ts.PreferredTime_value = stratuxClock.Time
 | |
| 
 | |
| 		// Extrapolate from GPS timestamp, if possible.
 | |
| 		if isGPSClockValid() && thisCurTimestamp > 0 {
 | |
| 			// Was the last timestamp either extrapolated or GPS time?
 | |
| 			last_ts := dataLogTimestamps[thisCurTimestamp]
 | |
| 			if last_ts.Time_type_preference == 1 || last_ts.Time_type_preference == 2 {
 | |
| 				// Extrapolate via stratuxClock.
 | |
| 				timeSinceLastTS := ts.StratuxClock_value.Sub(last_ts.StratuxClock_value) // stratuxClock ticks since last timestamp.
 | |
| 				extrapolatedGPSTimestamp := last_ts.PreferredTime_value.Add(timeSinceLastTS)
 | |
| 
 | |
| 				// Re-set the preferred timestamp type to '2' (extrapolated time).
 | |
| 				ts.Time_type_preference = 2
 | |
| 				ts.PreferredTime_value = extrapolatedGPSTimestamp
 | |
| 				ts.GPSClock_value = extrapolatedGPSTimestamp
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		dataLogTimestamps = append(dataLogTimestamps, ts)
 | |
| 		dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
 | |
| 		return false
 | |
| 	}
 | |
| 	return true
 | |
| }
 | |
| 
 | |
| type SQLiteMarshal struct {
 | |
| 	FieldType string
 | |
| 	Marshal   func(v reflect.Value) string
 | |
| }
 | |
| 
 | |
| func boolMarshal(v reflect.Value) string {
 | |
| 	b := v.Bool()
 | |
| 	if b {
 | |
| 		return "1"
 | |
| 	}
 | |
| 	return "0"
 | |
| }
 | |
| 
 | |
| func intMarshal(v reflect.Value) string {
 | |
| 	return strconv.FormatInt(v.Int(), 10)
 | |
| }
 | |
| 
 | |
| func uintMarshal(v reflect.Value) string {
 | |
| 	return strconv.FormatUint(v.Uint(), 10)
 | |
| }
 | |
| 
 | |
| func floatMarshal(v reflect.Value) string {
 | |
| 	return strconv.FormatFloat(v.Float(), 'f', 10, 64)
 | |
| }
 | |
| 
 | |
| func stringMarshal(v reflect.Value) string {
 | |
| 	return v.String()
 | |
| }
 | |
| 
 | |
| func notsupportedMarshal(v reflect.Value) string {
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| func structCanBeMarshalled(v reflect.Value) bool {
 | |
| 	m := v.MethodByName("String")
 | |
| 	if m.IsValid() && !m.IsNil() {
 | |
| 		return true
 | |
| 	}
 | |
| 	return false
 | |
| }
 | |
| 
 | |
| func structMarshal(v reflect.Value) string {
 | |
| 	if structCanBeMarshalled(v) {
 | |
| 		m := v.MethodByName("String")
 | |
| 		in := make([]reflect.Value, 0)
 | |
| 		ret := m.Call(in)
 | |
| 		if len(ret) > 0 {
 | |
| 			return ret[0].String()
 | |
| 		}
 | |
| 	}
 | |
| 	return ""
 | |
| }
 | |
| 
 | |
| var sqliteMarshalFunctions = map[string]SQLiteMarshal{
 | |
| 	"bool":         {FieldType: "INTEGER", Marshal: boolMarshal},
 | |
| 	"int":          {FieldType: "INTEGER", Marshal: intMarshal},
 | |
| 	"uint":         {FieldType: "INTEGER", Marshal: uintMarshal},
 | |
| 	"float":        {FieldType: "REAL", Marshal: floatMarshal},
 | |
| 	"string":       {FieldType: "TEXT", Marshal: stringMarshal},
 | |
| 	"struct":       {FieldType: "STRING", Marshal: structMarshal},
 | |
| 	"notsupported": {FieldType: "notsupported", Marshal: notsupportedMarshal},
 | |
| }
 | |
| 
 | |
| var sqlTypeMap = map[reflect.Kind]string{
 | |
| 	reflect.Bool:          "bool",
 | |
| 	reflect.Int:           "int",
 | |
| 	reflect.Int8:          "int",
 | |
| 	reflect.Int16:         "int",
 | |
| 	reflect.Int32:         "int",
 | |
| 	reflect.Int64:         "int",
 | |
| 	reflect.Uint:          "uint",
 | |
| 	reflect.Uint8:         "uint",
 | |
| 	reflect.Uint16:        "uint",
 | |
| 	reflect.Uint32:        "uint",
 | |
| 	reflect.Uint64:        "uint",
 | |
| 	reflect.Uintptr:       "notsupported",
 | |
| 	reflect.Float32:       "float",
 | |
| 	reflect.Float64:       "float",
 | |
| 	reflect.Complex64:     "notsupported",
 | |
| 	reflect.Complex128:    "notsupported",
 | |
| 	reflect.Array:         "notsupported",
 | |
| 	reflect.Chan:          "notsupported",
 | |
| 	reflect.Func:          "notsupported",
 | |
| 	reflect.Interface:     "notsupported",
 | |
| 	reflect.Map:           "notsupported",
 | |
| 	reflect.Ptr:           "notsupported",
 | |
| 	reflect.Slice:         "notsupported",
 | |
| 	reflect.String:        "string",
 | |
| 	reflect.Struct:        "struct",
 | |
| 	reflect.UnsafePointer: "notsupported",
 | |
| }
 | |
| 
 | |
| func makeTable(i interface{}, tbl string, db *sql.DB) {
 | |
| 	val := reflect.ValueOf(i)
 | |
| 
 | |
| 	fields := make([]string, 0)
 | |
| 	for i := 0; i < val.NumField(); i++ {
 | |
| 		kind := val.Field(i).Kind()
 | |
| 		fieldName := val.Type().Field(i).Name
 | |
| 		sqlTypeAlias := sqlTypeMap[kind]
 | |
| 
 | |
| 		// Check that if the field is a struct that it can be marshalled.
 | |
| 		if sqlTypeAlias == "struct" && !structCanBeMarshalled(val.Field(i)) {
 | |
| 			continue
 | |
| 		}
 | |
| 		if sqlTypeAlias == "notsupported" || fieldName == "id" {
 | |
| 			continue
 | |
| 		}
 | |
| 		sqlType := sqliteMarshalFunctions[sqlTypeAlias].FieldType
 | |
| 		s := fieldName + " " + sqlType
 | |
| 		fields = append(fields, s)
 | |
| 	}
 | |
| 
 | |
| 	// Add the timestamp_id field to link up with the timestamp table.
 | |
| 	if tbl != "timestamp" && tbl != "startup" {
 | |
| 		fields = append(fields, "timestamp_id INTEGER")
 | |
| 	}
 | |
| 
 | |
| 	tblCreate := fmt.Sprintf("CREATE TABLE %s (id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, %s)", tbl, strings.Join(fields, ", "))
 | |
| 
 | |
| 	_, err := db.Exec(tblCreate)
 | |
| 	if err != nil {
 | |
| 		fmt.Printf("ERROR: %s\n", err.Error())
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	bulkInsert().
 | |
| 		Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls.
 | |
| */
 | |
| 
 | |
| func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) {
 | |
| 	if _, ok := insertString[tbl]; !ok {
 | |
| 		return nil, errors.New("no insert statement")
 | |
| 	}
 | |
| 
 | |
| 	batchVals := insertBatchIfs[tbl]
 | |
| 	numColsPerRow := len(batchVals[0])
 | |
| 	maxRowBatch := int(999 / numColsPerRow) // SQLITE_MAX_VARIABLE_NUMBER = 999.
 | |
| 	//	log.Printf("table %s. %d cols per row. max batch %d\n", tbl, numColsPerRow, maxRowBatch)
 | |
| 	for len(batchVals) > 0 {
 | |
| 		//     timeInit := time.Now()
 | |
| 		i := int(0) // Variable number of rows per INSERT statement.
 | |
| 
 | |
| 		stmt := ""
 | |
| 		vals := make([]interface{}, 0)
 | |
| 		querySize := uint64(0)                                            // Size of the query in bytes.
 | |
| 		for len(batchVals) > 0 && i < maxRowBatch && querySize < 750000 { // Maximum of 1,000,000 bytes per query.
 | |
| 			if len(stmt) == 0 { // The first set will be covered by insertString.
 | |
| 				stmt = insertString[tbl]
 | |
| 				querySize += uint64(len(insertString[tbl]))
 | |
| 			} else {
 | |
| 				addStr := ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")"
 | |
| 				stmt += addStr
 | |
| 				querySize += uint64(len(addStr))
 | |
| 			}
 | |
| 			for _, val := range batchVals[0] {
 | |
| 				querySize += uint64(len(val.(string)))
 | |
| 			}
 | |
| 			vals = append(vals, batchVals[0]...)
 | |
| 			batchVals = batchVals[1:]
 | |
| 			i++
 | |
| 		}
 | |
| 		//		log.Printf("inserting %d rows to %s. querySize=%d\n", i, tbl, querySize)
 | |
| 		res, err = db.Exec(stmt, vals...)
 | |
| 		//      timeBatch := time.Since(timeInit)                                                                                                                     // debug
 | |
| 		//      log.Printf("SQLite: bulkInserted %d rows to %s. Took %f msec to build and insert query. querySize=%d\n", i, tbl, 1000*timeBatch.Seconds(), querySize) // debug
 | |
| 		if err != nil {
 | |
| 			log.Printf("sqlite INSERT error: '%s'\n", err.Error())
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	// Clear the buffers.
 | |
| 	delete(insertString, tbl)
 | |
| 	delete(insertBatchIfs, tbl)
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	insertData().
 | |
| 		Inserts an arbitrary struct into an SQLite table.
 | |
| 		 Inserts the timestamp first, if its 'id' is 0.
 | |
| 
 | |
| */
 | |
| 
 | |
| // Cached 'VALUES' statements. Indexed by table name.
 | |
| var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
 | |
| var insertBatchIfs map[string][][]interface{}
 | |
| 
 | |
| func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
 | |
| 	val := reflect.ValueOf(i)
 | |
| 
 | |
| 	keys := make([]string, 0)
 | |
| 	values := make([]string, 0)
 | |
| 	for i := 0; i < val.NumField(); i++ {
 | |
| 		kind := val.Field(i).Kind()
 | |
| 		fieldName := val.Type().Field(i).Name
 | |
| 		sqlTypeAlias := sqlTypeMap[kind]
 | |
| 
 | |
| 		if sqlTypeAlias == "notsupported" || fieldName == "id" {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		v := sqliteMarshalFunctions[sqlTypeAlias].Marshal(val.Field(i))
 | |
| 
 | |
| 		keys = append(keys, fieldName)
 | |
| 		values = append(values, v)
 | |
| 	}
 | |
| 
 | |
| 	// Add the timestamp_id field to link up with the timestamp table.
 | |
| 	if tbl != "timestamp" && tbl != "startup" {
 | |
| 		keys = append(keys, "timestamp_id")
 | |
| 		if dataLogTimestamps[ts_num].id == 0 {
 | |
| 			//FIXME: This is somewhat convoluted. When insertData() is called for a ts_num that corresponds to a timestamp with no database id,
 | |
| 			// then it inserts that timestamp via the same interface and the id is updated in the structure via the below lines
 | |
| 			// (dataLogTimestamps[ts_num].id = id).
 | |
| 			dataLogTimestamps[ts_num].StartupID = stratuxStartupID
 | |
| 			insertData(dataLogTimestamps[ts_num], "timestamp", db, ts_num) // Updates dataLogTimestamps[ts_num].id.
 | |
| 		}
 | |
| 		values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
 | |
| 	}
 | |
| 
 | |
| 	if _, ok := insertString[tbl]; !ok {
 | |
| 		// Prepare the statement.
 | |
| 		tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
 | |
| 			strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
 | |
| 		insertString[tbl] = tblInsert
 | |
| 	}
 | |
| 
 | |
| 	// Make the values slice into a slice of interface{}.
 | |
| 	ifs := make([]interface{}, len(values))
 | |
| 	for i := 0; i < len(values); i++ {
 | |
| 		ifs[i] = values[i]
 | |
| 	}
 | |
| 
 | |
| 	insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs)
 | |
| 
 | |
| 	if tbl == "timestamp" || tbl == "startup" { // Immediate insert always for "timestamp" and "startup" table.
 | |
| 		res, err := bulkInsert(tbl, db) // Bulk insert of 1, always.
 | |
| 		if err == nil {
 | |
| 			id, err := res.LastInsertId()
 | |
| 			if err == nil && tbl == "timestamp" { // Special handling for timestamps. Update the timestamp ID.
 | |
| 				ts := dataLogTimestamps[ts_num]
 | |
| 				ts.id = id
 | |
| 				dataLogTimestamps[ts_num] = ts
 | |
| 			}
 | |
| 			return id
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return 0
 | |
| }
 | |
| 
 | |
| type DataLogRow struct {
 | |
| 	tbl    string
 | |
| 	data   interface{}
 | |
| 	ts_num int64
 | |
| }
 | |
| 
 | |
| var dataLogChan chan DataLogRow
 | |
| var shutdownDataLog chan bool
 | |
| var shutdownDataLogWriter chan bool
 | |
| 
 | |
| var dataLogWriteChan chan DataLogRow
 | |
| 
 | |
| func dataLogWriter(db *sql.DB) {
 | |
| 	dataLogWriteChan = make(chan DataLogRow, 10240)
 | |
| 	shutdownDataLogWriter = make(chan bool)
 | |
| 	// The write queue. As data comes in via dataLogChan, it is timestamped and stored.
 | |
| 	//  When writeTicker comes up, the queue is emptied.
 | |
| 	writeTicker := time.NewTicker(10 * time.Second)
 | |
| 	rowsQueuedForWrite := make([]DataLogRow, 0)
 | |
| 	for {
 | |
| 		select {
 | |
| 		case r := <-dataLogWriteChan:
 | |
| 			// Accept timestamped row.
 | |
| 			rowsQueuedForWrite = append(rowsQueuedForWrite, r)
 | |
| 		case <-writeTicker.C:
 | |
| 			//			for i := 0; i < 1000; i++ {
 | |
| 			//				logSituation()
 | |
| 			//			}
 | |
| 			timeStart := stratuxClock.Time
 | |
| 			nRows := len(rowsQueuedForWrite)
 | |
| 			if globalSettings.DEBUG {
 | |
| 				log.Printf("Writing %d rows\n", nRows)
 | |
| 			}
 | |
| 			// Write the buffered rows. This will block while it is writing.
 | |
| 			// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
 | |
| 			tblsAffected := make(map[string]bool)
 | |
| 			// Start transaction.
 | |
| 			tx, err := db.Begin()
 | |
| 			if err != nil {
 | |
| 				log.Printf("db.Begin() error: %s\n", err.Error())
 | |
| 				break // from select {}
 | |
| 			}
 | |
| 			for _, r := range rowsQueuedForWrite {
 | |
| 				tblsAffected[r.tbl] = true
 | |
| 				insertData(r.data, r.tbl, db, r.ts_num)
 | |
| 			}
 | |
| 			// Do the bulk inserts.
 | |
| 			for tbl, _ := range tblsAffected {
 | |
| 				bulkInsert(tbl, db)
 | |
| 			}
 | |
| 			// Close the transaction.
 | |
| 			tx.Commit()
 | |
| 			rowsQueuedForWrite = make([]DataLogRow, 0) // Zero the queue.
 | |
| 			timeElapsed := stratuxClock.Since(timeStart)
 | |
| 			if globalSettings.DEBUG {
 | |
| 				rowsPerSecond := float64(nRows) / float64(timeElapsed.Seconds())
 | |
| 				log.Printf("Writing finished. %d rows in %.2f seconds (%.1f rows per second).\n", nRows, float64(timeElapsed.Seconds()), rowsPerSecond)
 | |
| 			}
 | |
| 			if timeElapsed.Seconds() > 10.0 {
 | |
| 				log.Printf("WARNING! SQLite logging is behind. Last write took %.1f seconds.\n", float64(timeElapsed.Seconds()))
 | |
| 				dataLogCriticalErr := fmt.Errorf("WARNING! SQLite logging is behind. Last write took %.1f seconds.\n", float64(timeElapsed.Seconds()))
 | |
| 				addSystemError(dataLogCriticalErr)
 | |
| 			}
 | |
| 		case <-shutdownDataLogWriter: // Received a message on the channel to initiate a graceful shutdown, and to command dataLog() to shut down
 | |
| 			log.Printf("datalog.go: dataLogWriter() received shutdown message with rowsQueuedForWrite = %d\n", len(rowsQueuedForWrite))
 | |
| 			shutdownDataLog <- true
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	log.Printf("datalog.go: dataLogWriter() shutting down\n")
 | |
| }
 | |
| 
 | |
| func dataLog() {
 | |
| 	dataLogStarted = true
 | |
| 	log.Printf("datalog.go: dataLog() started\n")
 | |
| 	dataLogChan = make(chan DataLogRow, 10240)
 | |
| 	shutdownDataLog = make(chan bool)
 | |
| 	dataLogTimestamps = make([]StratuxTimestamp, 0)
 | |
| 	var ts StratuxTimestamp
 | |
| 	ts.id = 0
 | |
| 	ts.Time_type_preference = 0 // stratuxClock.
 | |
| 	ts.StratuxClock_value = stratuxClock.Time
 | |
| 	ts.GPSClock_value = time.Time{}
 | |
| 	ts.PreferredTime_value = stratuxClock.Time
 | |
| 	dataLogTimestamps = append(dataLogTimestamps, ts)
 | |
| 	dataLogCurTimestamp = 0
 | |
| 
 | |
| 	// Check if we need to create a new database.
 | |
| 	createDatabase := false
 | |
| 
 | |
| 	if _, err := os.Stat(dataLogFilef); os.IsNotExist(err) {
 | |
| 		createDatabase = true
 | |
| 		log.Printf("creating new database '%s'.\n", dataLogFilef)
 | |
| 	}
 | |
| 
 | |
| 	db, err := sql.Open("sqlite3", dataLogFilef)
 | |
| 	if err != nil {
 | |
| 		log.Printf("sql.Open(): %s\n", err.Error())
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		db.Close()
 | |
| 		dataLogStarted = false
 | |
| 		//close(dataLogChan)
 | |
| 		log.Printf("datalog.go: dataLog() has closed DB in %s\n", dataLogFilef)
 | |
| 	}()
 | |
| 
 | |
| 	_, err = db.Exec("PRAGMA journal_mode=WAL")
 | |
| 	if err != nil {
 | |
| 		log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
 | |
| 	}
 | |
| 	_, err = db.Exec("PRAGMA synchronous=OFF")
 | |
| 	if err != nil {
 | |
| 		log.Printf("db.Exec('PRAGMA journal_mode=WAL') err: %s\n", err.Error())
 | |
| 	}
 | |
| 
 | |
| 	//log.Printf("Starting dataLogWriter\n") // REMOVE -- DEBUG
 | |
| 	go dataLogWriter(db)
 | |
| 
 | |
| 	// Do we need to create the database?
 | |
| 	if createDatabase {
 | |
| 		makeTable(StratuxTimestamp{}, "timestamp", db)
 | |
| 		makeTable(mySituation, "mySituation", db)
 | |
| 		makeTable(globalStatus, "status", db)
 | |
| 		makeTable(globalSettings, "settings", db)
 | |
| 		makeTable(TrafficInfo{}, "traffic", db)
 | |
| 		makeTable(msg{}, "messages", db)
 | |
| 		makeTable(esmsg{}, "es_messages", db)
 | |
| 		makeTable(Dump1090TermMessage{}, "dump1090_terminal", db)
 | |
| 		makeTable(StratuxStartup{}, "startup", db)
 | |
| 	}
 | |
| 
 | |
| 	// The first entry to be created is the "startup" entry.
 | |
| 	stratuxStartupID = insertData(StratuxStartup{}, "startup", db, 0)
 | |
| 
 | |
| 	dataLogReadyToWrite = true
 | |
| 	//log.Printf("Entering dataLog read loop\n") //REMOVE -- DEBUG
 | |
| 	for {
 | |
| 		select {
 | |
| 		case r := <-dataLogChan:
 | |
| 			// When data is input, the first step is to timestamp it.
 | |
| 			// Check if our time bucket has expired or has never been entered.
 | |
| 			checkTimestamp()
 | |
| 			// Mark the row with the current timestamp ID, in case it gets entered later.
 | |
| 			r.ts_num = dataLogCurTimestamp
 | |
| 			// Queue it for the scheduled write.
 | |
| 			dataLogWriteChan <- r
 | |
| 		case <-shutdownDataLog: // Received a message on the channel to complete a graceful shutdown (see the 'defer func()...' statement above).
 | |
| 			log.Printf("datalog.go: dataLog() received shutdown message\n")
 | |
| 			return
 | |
| 		}
 | |
| 	}
 | |
| 	log.Printf("datalog.go: dataLog() shutting down\n")
 | |
| 	close(shutdownDataLog)
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	setDataLogTimeWithGPS().
 | |
| 		Create a timestamp entry using GPS time.
 | |
| */
 | |
| 
 | |
| func setDataLogTimeWithGPS(sit SituationData) {
 | |
| 	if isGPSClockValid() {
 | |
| 		var ts StratuxTimestamp
 | |
| 		// Piggyback a GPS time update from this update.
 | |
| 		ts.id = 0
 | |
| 		ts.Time_type_preference = 1 // gpsClock.
 | |
| 		ts.StratuxClock_value = stratuxClock.Time
 | |
| 		ts.GPSClock_value = sit.GPSTime
 | |
| 		ts.PreferredTime_value = sit.GPSTime
 | |
| 
 | |
| 		dataLogTimestamps = append(dataLogTimestamps, ts)
 | |
| 		dataLogCurTimestamp = int64(len(dataLogTimestamps) - 1)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	logSituation(), logStatus(), ... pass messages from other functions to the logging
 | |
| 		engine. These are only read into `dataLogChan` if the Replay Log is toggled on,
 | |
| 		and if the log system is ready to accept writes.
 | |
| */
 | |
| 
 | |
| func isDataLogReady() bool {
 | |
| 	return dataLogReadyToWrite
 | |
| }
 | |
| 
 | |
| func logSituation() {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "mySituation", data: mySituation}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logStatus() {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "status", data: globalStatus}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logSettings() {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "settings", data: globalSettings}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logTraffic(ti TrafficInfo) {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "traffic", data: ti}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logMsg(m msg) {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "messages", data: m}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logESMsg(m esmsg) {
 | |
| 	if globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "es_messages", data: m}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func logDump1090TermMessage(m Dump1090TermMessage) {
 | |
| 	if globalSettings.DEBUG && globalSettings.ReplayLog && isDataLogReady() {
 | |
| 		dataLogChan <- DataLogRow{tbl: "dump1090_terminal", data: m}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func initDataLog() {
 | |
| 	//log.Printf("dataLogStarted = %t. dataLogReadyToWrite = %t\n", dataLogStarted, dataLogReadyToWrite) //REMOVE -- DEBUG
 | |
| 	insertString = make(map[string]string)
 | |
| 	insertBatchIfs = make(map[string][][]interface{})
 | |
| 	go dataLogWatchdog()
 | |
| 
 | |
| 	//log.Printf("datalog.go: initDataLog() complete.\n") //REMOVE -- DEBUG
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	dataLogWatchdog(): Watchdog function to control startup / shutdown of data logging subsystem.
 | |
| 		Called by initDataLog as a goroutine. It iterates once per second to determine if
 | |
| 		globalSettings.ReplayLog has toggled. If logging was switched from off to on, it starts
 | |
| 		datalog() as a goroutine. If the log is running and we want it to stop, it calls
 | |
| 		closeDataLog() to turn off the input channels, close the log, and tear down the dataLog
 | |
| 		and dataLogWriter goroutines.
 | |
| */
 | |
| 
 | |
| func dataLogWatchdog() {
 | |
| 	for {
 | |
| 		if !dataLogStarted && globalSettings.ReplayLog { // case 1: sqlite logging isn't running, and we want to start it
 | |
| 			log.Printf("datalog.go: Watchdog wants to START logging.\n")
 | |
| 			go dataLog()
 | |
| 		} else if dataLogStarted && !globalSettings.ReplayLog { // case 2:  sqlite logging is running, and we want to shut it down
 | |
| 			log.Printf("datalog.go: Watchdog wants to STOP logging.\n")
 | |
| 			closeDataLog()
 | |
| 		}
 | |
| 		//log.Printf("Watchdog iterated.\n") //REMOVE -- DEBUG
 | |
| 		time.Sleep(1 * time.Second)
 | |
| 		//log.Printf("Watchdog sleep over.\n") //REMOVE -- DEBUG
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /*
 | |
| 	closeDataLog(): Handler for graceful shutdown of data logging goroutines. It is called by
 | |
| 		by dataLogWatchdog(), gracefulShutdown(), and by any other function (disk space monitor?)
 | |
| 		that needs to be able to shut down sqlite logging without corrupting data or blocking
 | |
| 		execution.
 | |
| 
 | |
| 		This function turns off log message reads into the dataLogChan receiver, and sends a
 | |
| 		message to a quit channel ('shutdownDataLogWriter`) in dataLogWriter(). dataLogWriter()
 | |
| 		then sends a message to a quit channel to 'shutdownDataLog` in dataLog() to close *that*
 | |
| 		goroutine. That function sets dataLogStarted=false once the logfile is closed. By waiting
 | |
| 		for that signal, closeDataLog() won't exit until the log is safely written. This prevents
 | |
| 		data loss on shutdown.
 | |
| */
 | |
| 
 | |
| func closeDataLog() {
 | |
| 	//log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
 | |
| 	dataLogReadyToWrite = false // prevent any new messages from being sent down the channels
 | |
| 	log.Printf("datalog.go: Starting data log shutdown\n")
 | |
| 	shutdownDataLogWriter <- true      //
 | |
| 	defer close(shutdownDataLogWriter) // ... and close the channel so subsequent accidental writes don't stall execution
 | |
| 	log.Printf("datalog.go: Waiting for shutdown signal from dataLog()")
 | |
| 	for dataLogStarted {
 | |
| 		//log.Printf("closeDataLog(): dataLogStarted = %t\n", dataLogStarted) //REMOVE -- DEBUG
 | |
| 		time.Sleep(50 * time.Millisecond)
 | |
| 	}
 | |
| 	log.Printf("datalog.go: Data log shutdown successful.\n")
 | |
| }
 |