Use batched inserts instead of prepared statements.

pull/360/head
Christopher Young 2016-04-01 18:25:53 -04:00
rodzic f1f3977e5b
commit 62dd58b07f
2 zmienionych plików z 77 dodań i 24 usunięć

Wyświetl plik

@ -12,6 +12,7 @@ package main
import (
"database/sql"
"errors"
"fmt"
_ "github.com/mattn/go-sqlite3"
"log"
@ -203,6 +204,45 @@ func makeTable(i interface{}, tbl string, db *sql.DB) {
}
}
/*
bulkInsert().
Reads insertBatch and insertBatchIfs. This is called after a group of insertData() calls.
*/
func bulkInsert(tbl string, db *sql.DB) (res sql.Result, err error) {
if _, ok := insertString[tbl]; !ok {
return nil, errors.New("no insert statement")
}
batchVals := insertBatchIfs[tbl]
for len(batchVals) > 0 {
i := int(0) // Maximum of 10 rows per INSERT statement.
stmt := ""
vals := make([]interface{}, 0)
for len(batchVals) > 0 && i < 10 {
if len(stmt) == 0 { // The first set will be covered by insertString.
stmt = insertString[tbl]
} else {
stmt += ", (" + strings.Join(strings.Split(strings.Repeat("?", len(batchVals[0])), ""), ",") + ")"
}
vals = append(vals, batchVals[0]...)
batchVals = batchVals[1:]
i++
}
log.Printf("bulkInsert %d vals: %s\n", len(vals), stmt)
res, err = db.Exec(stmt, vals...)
if err != nil {
return
}
}
// Clear the buffers.
delete(insertString, tbl)
delete(insertBatchIfs, tbl)
return
}
/*
insertData().
Inserts an arbitrary struct into an SQLite table.
@ -210,8 +250,9 @@ func makeTable(i interface{}, tbl string, db *sql.DB) {
*/
// Cached prepared statements. Indexed by table name.
var preparedStmts map[string]*sql.Stmt
// Cached 'VALUES' statements. Indexed by table name.
var insertString map[string]string // INSERT INTO tbl (col1, col2, ...) VALUES(?, ?, ...). Only for one value.
var insertBatchIfs map[string][][]interface{}
func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
val := reflect.ValueOf(i)
@ -245,34 +286,33 @@ func insertData(i interface{}, tbl string, db *sql.DB, ts_num int64) int64 {
values = append(values, strconv.FormatInt(dataLogTimestamps[ts_num].id, 10))
}
if _, ok := preparedStmts[tbl]; !ok {
if _, ok := insertString[tbl]; !ok {
// Prepare the statement.
tblInsert := fmt.Sprintf("INSERT INTO %s (%s) VALUES(%s)", tbl, strings.Join(keys, ","),
strings.Join(strings.Split(strings.Repeat("?", len(keys)), ""), ","))
s, err := db.Prepare(tblInsert)
if err != nil {
log.Printf("insertData: db.Prepare() error (%s): %s\n", tblInsert, err.Error)
panic(err.Error())
}
preparedStmts[tbl] = s
insertString[tbl] = tblInsert
}
// Make the values slice into a slice of interface{}.
ifs := make([]interface{}, len(values))
for i := 0; i < len(values); i++ {
ifs[i] = values[i]
}
res, err := preparedStmts[tbl].Exec(ifs...)
if err != nil {
log.Printf("ERROR: %s\n", err.Error())
}
id, err := res.LastInsertId()
if err == nil {
if tbl == "timestamp" {
ts := dataLogTimestamps[ts_num]
ts.id = id
dataLogTimestamps[ts_num] = ts
insertBatchIfs[tbl] = append(insertBatchIfs[tbl], ifs)
if tbl == "timestamp" { // Immediate insert always for "timestamp" table.
res, err := bulkInsert("timestamp", db) // Bulk insert of 1, always.
if err == nil {
id, err := res.LastInsertId()
if err == nil {
ts := dataLogTimestamps[ts_num]
ts.id = id
dataLogTimestamps[ts_num] = ts
log.Printf("new ts=%d\n", id)
}
return id
}
return id
}
return 0
@ -301,7 +341,12 @@ func dataLogWriter(db *sql.DB) {
// Accept timestamped row.
rowsQueuedForWrite = append(rowsQueuedForWrite, r)
case <-writeTicker.C:
for i := 0; i < 1000; i++ {
logSituation()
}
// Write the buffered rows. This will block while it is writing.
// Save the names of the tables affected so that we can run bulkInsert() on after the insertData() calls.
tblsAffected := make(map[string]bool)
// Start transaction.
log.Printf("go %d\n", len(rowsQueuedForWrite))
tx, err := db.Begin()
@ -310,8 +355,13 @@ func dataLogWriter(db *sql.DB) {
break // from select {}
}
for _, r := range rowsQueuedForWrite {
tblsAffected[r.tbl] = true
insertData(r.data, r.tbl, db, r.ts_num)
}
// Do the bulk inserts.
for tbl, _ := range tblsAffected {
bulkInsert(tbl, db)
}
// Close the transaction.
tx.Commit()
log.Printf("done\n")
@ -419,6 +469,7 @@ func logDump1090TermMessage(m Dump1090TermMessage) {
}
func initDataLog() {
preparedStmts = make(map[string]*sql.Stmt)
insertString = make(map[string]string)
insertBatchIfs = make(map[string][][]interface{})
go dataLog()
}

Wyświetl plik

@ -430,10 +430,13 @@ return is true if parse occurs correctly and position is valid.
*/
func processNMEALine(l string) bool {
func processNMEALine(l string) (sentenceUsed bool) {
mySituation.mu_GPS.Lock()
defer func() {
logSituation()
if sentenceUsed || globalSettings.DEBUG {
logSituation()
}
mySituation.mu_GPS.Unlock()
}()
@ -622,7 +625,6 @@ func processNMEALine(l string) bool {
x[8+6*i] // lock time, sec, 0-64
*/
return true
} else if x[1] == "04" { // clock message
// field 5 is UTC week (epoch = 1980-JAN-06). If this is invalid, do not parse date / time
utcWeek, err0 := strconv.Atoi(x[5])