瀏覽代碼

Added writer

Spencer Gardner 7 年之前
父節點
當前提交
6f16ea992c
共有 3 個文件被更改,包括 93 次插入56 次删除
  1. 2 2
      Dockerfile
  2. 18 54
      log.go
  3. 73 0
      writer/writer.go

+ 2 - 2
Dockerfile

@@ -37,9 +37,9 @@ ENV USER ""
 ENV PASSWORD ""
 ENV HOST http://localhost:8086
 ENV RTL_ARGS ""
-ENV LOCATION room
+ENV LOCATIONS ""
 
-CMD rtl_433 $RTL_ARGS | log-temperature -host $HOST -db $DB -password $PASSWORD -user $USER -location $LOCATION
+CMD rtl_433 $RTL_ARGS | log-temperature -host $HOST -db $DB -password $PASSWORD -user $USER $LOCATIONS
 
 ##############################
 

+ 18 - 54
log.go

@@ -6,12 +6,12 @@ import (
 	"errors"
 	"flag"
 	"fmt"
-	parser "log-temperature/parser"
+	"os"
 	"strconv"
 	"strings"
-	"time"
 
-	influx "github.com/influxdata/influxdb/client/v2"
+	"git.snppla.net/snppla/log-temperature/parser"
+	"git.snppla.net/snppla/log-temperature/writer"
 )
 
 type arrayFlags map[int]string
@@ -20,6 +20,7 @@ var db *string
 var username *string
 var password *string
 var host *string
+var test *bool
 var locations = arrayFlags{}
 
 func (i arrayFlags) String() string {
@@ -44,28 +45,29 @@ func main() {
 	username = flag.String("user", "", "Username")
 	password = flag.String("password", "", "Password")
 	host = flag.String("host", "http://localhost", "Host to connect to")
+	test = flag.Bool("test", false, "Use a test input string")
 	flag.Var(&locations, "location", "Id and location of the sensor (5:conference-room")
+
+	var reader *bufio.Reader
+
 	flag.Parse()
 
-	//reader := bufio.NewReader(os.Stdin)
-	reader := bufio.NewReader(strings.NewReader(jasonTestString))
-	client, err := influx.NewHTTPClient(influx.HTTPConfig{
-		Addr:     *host,
-		Username: *username,
-		Password: *password,
-	})
+	if *test {
+		reader = bufio.NewReader(strings.NewReader(jasonTestString))
+	} else {
+		reader = bufio.NewReader(os.Stdin)
+	}
+
+	parser := parser.JSONParser{Reader: reader}
+	writer, err := writer.NewInfluxWriter(*db, *host, *username, *password, locations)
 	if err != nil {
-		fmt.Println("Could not connect")
-		fmt.Println(err)
+		fmt.Println(err.Error())
 		return
 	}
-	_ = reader
-	parser := parser.JSONParser{reader}
 	for {
 		measurement, err := parser.Measure()
 		if measurement != nil {
-			logTemp(*measurement, client)
-			_ = client
+			writer.Write(*measurement)
 		}
 		if err != nil {
 			fmt.Println(err.Error())
@@ -74,44 +76,6 @@ func main() {
 	}
 }
 
-func logTemp(measurement parser.Measurement, client influx.Client) {
-	bp, err := influx.NewBatchPoints(influx.BatchPointsConfig{
-		Database:  *db,
-		Precision: "s",
-	})
-	if err != nil {
-		fmt.Println("Could not log temperature")
-		fmt.Println(err)
-		return
-	}
-	var tags = map[string]string{}
-	if val, ok := locations[measurement.ID]; ok {
-		tags["location"] = val
-
-	} else {
-		tags["location"] = "unknown"
-	}
-	fields := map[string]interface{}{
-		"temperature": measurement.Temperature,
-	}
-
-	pt, err := influx.NewPoint("temperature", tags, fields, time.Now())
-
-	if err != nil {
-		fmt.Println("Could not log temperature")
-		fmt.Println(err)
-		return
-	}
-	bp.AddPoint(pt)
-	err = client.Write(bp)
-
-	if err != nil {
-		fmt.Println("Could not log temperature")
-		fmt.Println(err)
-		return
-	}
-}
-
 const (
 	simpleTestString = `
 	2018-04-13 22:57:36 :   Acurite 606TX Sensor    :       -5

+ 73 - 0
writer/writer.go

@@ -0,0 +1,73 @@
+package writer
+
+import (
+	"fmt"
+	"time"
+
+	"git.snppla.net/snppla/log-temperature/parser"
+	influx "github.com/influxdata/influxdb/client/v2"
+)
+
+// Writer is used to write out measurements
+type Writer interface {
+	Write(parser.Measurement) error
+}
+
+type influxWriter struct {
+	locations map[int]string
+	db        string
+	client    influx.Client
+}
+
+func (i influxWriter) Write(measurement parser.Measurement) error {
+	bp, err := influx.NewBatchPoints(influx.BatchPointsConfig{
+		Database:  i.db,
+		Precision: "s",
+	})
+	if err != nil {
+		fmt.Println("Could not log temperature")
+		fmt.Println(err)
+		return err
+	}
+	var tags = map[string]string{}
+	if val, ok := i.locations[measurement.ID]; ok {
+		tags["location"] = val
+
+	} else {
+		tags["location"] = "id:" + string(measurement.ID)
+	}
+	fields := map[string]interface{}{
+		"temperature": measurement.Temperature,
+	}
+
+	pt, err := influx.NewPoint("temperature", tags, fields, time.Now())
+
+	if err != nil {
+		fmt.Println("Could not log temperature")
+		fmt.Println(err)
+		return err
+	}
+	bp.AddPoint(pt)
+	err = i.client.Write(bp)
+
+	if err != nil {
+		fmt.Println("Could not log temperature")
+		fmt.Println(err)
+		return err
+	}
+	return nil
+}
+
+// NewInfluxWriter creates an influxdb writer
+func NewInfluxWriter(db string, host string, username string, password string, locations map[int]string) (Writer, error) {
+	client, err := influx.NewHTTPClient(influx.HTTPConfig{
+		Addr:     host,
+		Username: username,
+		Password: password,
+	})
+	if err != nil {
+		return nil, err
+	}
+	return &influxWriter{db: db, client: client, locations: locations}, nil
+
+}