From 62dd58b07f15eb68c27c00c6d027cacca034b093 Mon Sep 17 00:00:00 2001 From: Christopher Young Date: Fri, 1 Apr 2016 18:25:53 -0400 Subject: [PATCH] Use batched inserts instead of prepared statements. --- main/datalog.go | 93 ++++++++++++++++++++++++++++++++++++++----------- main/ry835ai.go | 8 +++-- 2 files changed, 77 insertions(+), 24 deletions(-) diff --git a/main/datalog.go b/main/datalog.go index 77a9286e..0886dcb4 100644 --- a/main/datalog.go +++ b/main/datalog.go @@ -12,6 +12,7 @@ package main import ( "database/sql" + "errors" "fmt" _ "github.com/mattn/go-sqlite3" "log" @@ -203,6 +204,45 @@ func makeTable(i interface{}, tbl string, db *sql.DB) { } } +/* + bulkInsert(). + Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls. +*/ + +func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) { + if _, ok := insertString[tbl]; !ok { + return nil, errors.New("no insert statement") + } + + batchVals := insertBatchIfs[tbl] + for len(batchVals) > 0 { + i := int(0) // Maximum of 10 rows per INSERT statement. + stmt := "" + vals := make([]interface{}, 0) + for len(batchVals) > 0 && i < 10 { + if len(stmt) == 0 { // The first set will be covered by insertString. + stmt = insertString[tbl] + } else { + stmt += ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")" + } + vals = append(vals, batchVals[0]...) + batchVals = batchVals[1:] + i++ + } + log.Printf("bulkInsert %d vals: %s\n", len(vals), stmt) + res, err = db.Exec(stmt, vals...) + if err != nil { + return + } + } + + // Clear the buffers. + delete(insertString, tbl) + delete(insertBatchIfs, tbl) + + return +} + /* insertData(). Inserts an arbitrary struct into an SQLite table. @@ -210,8 +250,9 @@ func makeTable(i interface{}, tbl string, db *sql.DB) { */ -// Cached prepared statements. Indexed by table name. -var preparedStmts map[string]*sql.Stmt +// Cached 'VALUES' statements. Indexed by table name. +var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value. +var insertBatchIfs map[string][][]interface{} func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 { val := reflect.ValueOf(i) @@ -245,34 +286,33 @@ func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 { values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10)) } - if _, ok := preparedStmts[tbl]; !ok { + if _, ok := insertString[tbl]; !ok { // Prepare the statement. tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","), strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ",")) - s, err := db.Prepare(tblInsert) - if err != nil { - log.Printf("insertData: db.Prepare() error (%s): %s\n", tblInsert, err.Error) - panic(err.Error()) - } - preparedStmts[tbl] = s + insertString[tbl] = tblInsert } + // Make the values slice into a slice of interface{}. ifs := make([]interface{}, len(values)) for i := 0; i < len(values); i++ { ifs[i] = values[i] } - res, err := preparedStmts[tbl].Exec(ifs...) - if err != nil { - log.Printf("ERROR: %s\n", err.Error()) - } - id, err := res.LastInsertId() - if err == nil { - if tbl == "timestamp" { - ts := dataLogTimestamps[ts_num] - ts.id = id - dataLogTimestamps[ts_num] = ts + + insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs) + + if tbl == "timestamp" { // Immediate insert always for "timestamp" table. + res, err := bulkInsert("timestamp", db) // Bulk insert of 1, always. + if err == nil { + id, err := res.LastInsertId() + if err == nil { + ts := dataLogTimestamps[ts_num] + ts.id = id + dataLogTimestamps[ts_num] = ts + log.Printf("new ts=%d\n", id) + } + return id } - return id } return 0 @@ -301,7 +341,12 @@ func dataLogWriter(db *sql.DB) { // Accept timestamped row. rowsQueuedForWrite = append(rowsQueuedForWrite, r) case <-writeTicker.C: + for i := 0; i < 1000; i++ { + logSituation() + } // Write the buffered rows. This will block while it is writing. + // Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls. + tblsAffected := make(map[string]bool) // Start transaction. log.Printf("go %d\n", len(rowsQueuedForWrite)) tx, err := db.Begin() @@ -310,8 +355,13 @@ func dataLogWriter(db *sql.DB) { break // from select {} } for _, r := range rowsQueuedForWrite { + tblsAffected[r.tbl] = true insertData(r.data, r.tbl, db, r.ts_num) } + // Do the bulk inserts. + for tbl, _ := range tblsAffected { + bulkInsert(tbl, db) + } // Close the transaction. tx.Commit() log.Printf("done\n") @@ -419,6 +469,7 @@ func logDump1090TermMessage(m Dump1090TermMessage) { } func initDataLog() { - preparedStmts = make(map[string]*sql.Stmt) + insertString = make(map[string]string) + insertBatchIfs = make(map[string][][]interface{}) go dataLog() } diff --git a/main/ry835ai.go b/main/ry835ai.go index c8e67eaa..81b8fa2d 100644 --- a/main/ry835ai.go +++ b/main/ry835ai.go @@ -430,10 +430,13 @@ return is true if parse occurs correctly and position is valid. */ -func processNMEALine(l string) bool { +func processNMEALine(l string) (sentenceUsed bool) { mySituation.mu_GPS.Lock() + defer func() { - logSituation() + if sentenceUsed || globalSettings.DEBUG { + logSituation() + } mySituation.mu_GPS.Unlock() }() @@ -622,7 +625,6 @@ func processNMEALine(l string) bool { x[8+6*i] // lock time, sec, 0-64 */ return true - } else if x[1] == "04" { // clock message // field 5 is UTC week (epoch = 1980-JAN-06). If this is invalid, do not parse date / time utcWeek, err0 := strconv.Atoi(x[5])