-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcontroller_pws_weather.go
172 lines (141 loc) · 5.07 KB
/
controller_pws_weather.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package main
import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"sync"
"time"
"go.uber.org/zap"
)
// PWSWeatherController holds our connection along with some mutexes for operation
type PWSWeatherController struct {
ctx context.Context
wg *sync.WaitGroup
config *Config
PWSWeatherConfig PWSWeatherConfig
logger *zap.SugaredLogger
DB *TimescaleDBClient
}
// PWSWeatherConfig holds configuration for this controller
type PWSWeatherConfig struct {
StationID string `yaml:"station-id,omitempty"`
APIKey string `yaml:"api-key,omitempty"`
APIEndpoint string `yaml:"api-endpoint,omitempty"`
UploadInterval string `yaml:"upload-interval,omitempty"`
PullFromDevice string `yaml:"pull-from-device,omitempty"`
}
func NewPWSWeatherController(ctx context.Context, wg *sync.WaitGroup, c *Config, p PWSWeatherConfig, logger *zap.SugaredLogger) (*PWSWeatherController, error) {
pwsc := PWSWeatherController{
ctx: ctx,
wg: wg,
config: c,
PWSWeatherConfig: p,
logger: logger,
}
if pwsc.config.Storage.TimescaleDB.ConnectionString == "" {
return &PWSWeatherController{}, fmt.Errorf("TimescaleDB storage must be configured for the PWS Weather controller to function")
}
if pwsc.PWSWeatherConfig.StationID == "" {
return &PWSWeatherController{}, fmt.Errorf("station ID must be set")
}
if pwsc.PWSWeatherConfig.APIKey == "" {
return &PWSWeatherController{}, fmt.Errorf("API key must be set")
}
if pwsc.PWSWeatherConfig.PullFromDevice == "" {
return &PWSWeatherController{}, fmt.Errorf("pull-from-device must be set")
}
if pwsc.PWSWeatherConfig.APIEndpoint == "" {
pwsc.PWSWeatherConfig.APIEndpoint = "https://pwsupdate.pwsweather.com/api/v1/submitwx"
}
if pwsc.PWSWeatherConfig.UploadInterval == "" {
// Use a default interval of 60 seconds
pwsc.PWSWeatherConfig.UploadInterval = "60"
}
pwsc.DB = NewTimescaleDBClient(c, logger)
if !pwsc.DB.validatePullFromStation(pwsc.PWSWeatherConfig.PullFromDevice) {
return &PWSWeatherController{}, fmt.Errorf("pull-from-device %v is not a valid station name", pwsc.PWSWeatherConfig.PullFromDevice)
}
err := pwsc.DB.connectToTimescaleDB(c.Storage)
if err != nil {
return &PWSWeatherController{}, fmt.Errorf("could not connect to TimescaleDB: %v", err)
}
return &pwsc, nil
}
func (p *PWSWeatherController) StartController() error {
go p.sendPeriodicReports()
return nil
}
func (p *PWSWeatherController) sendPeriodicReports() {
p.wg.Add(1)
defer p.wg.Done()
submitInterval, err := time.ParseDuration(fmt.Sprintf("%vs", p.PWSWeatherConfig.UploadInterval))
if err != nil {
log.Errorf("error parsing duration: %v", err)
}
ticker := time.NewTicker(submitInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debug("Sending reading to PWS Weather...")
br, err := p.DB.getReadingsFromTimescaleDB(p.PWSWeatherConfig.PullFromDevice)
if err != nil {
log.Info("error getting readings from TimescaleDB:", err)
}
log.Debugf("readings fetched from TimescaleDB for PWS Weather: %+v", br)
err = p.sendReadingsToPWSWeather(&br)
if err != nil {
log.Errorf("error sending readings to PWS Weather: %v", err)
}
case <-p.ctx.Done():
return
}
}
}
func (p *PWSWeatherController) sendReadingsToPWSWeather(r *FetchedBucketReading) error {
v := url.Values{}
if r.Barometer == 0 && r.OutTemp == 0 {
return fmt.Errorf("rejecting likely faulty reading (temp %v, barometer %v)", r.OutTemp, r.Barometer)
}
// Add our authentication parameters to our URL
v.Set("ID", p.PWSWeatherConfig.StationID)
v.Set("PASSWORD", p.PWSWeatherConfig.APIKey)
now := time.Now().In(time.UTC)
v.Set("dateutc", now.Format("2006-01-02 15:04:05"))
// Set some values for our weather metrics
v.Set("winddir", strconv.FormatInt(int64(r.WindDir), 10))
v.Set("windspeedmph", strconv.FormatInt(int64(r.WindSpeed), 10))
v.Set("windgustmph", strconv.FormatInt(int64(r.MaxWindSpeed), 10))
v.Set("humidity", strconv.FormatInt(int64(r.OutHumidity), 10))
v.Set("tempf", fmt.Sprintf("%.1f", r.OutTemp))
v.Set("dailyrainin", fmt.Sprintf("%.2f", r.DayRain))
v.Set("baromin", fmt.Sprintf("%.2f", r.Barometer))
v.Set("solarradiation", fmt.Sprintf("%0.2f", r.SolarWatts))
v.Set("softwaretype", fmt.Sprintf("RemoteWeather-%v", version))
client := http.Client{
Timeout: 5 * time.Second,
}
req, err := http.NewRequest("GET", fmt.Sprint(p.PWSWeatherConfig.APIEndpoint+"?"+v.Encode()), nil)
if err != nil {
return fmt.Errorf("error creating PWS Weather HTTP request: %v", err)
}
log.Debugf("Making request to PWS weather: %v?%v", p.PWSWeatherConfig.APIEndpoint, v.Encode())
req = req.WithContext(p.ctx)
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error sending report to PWS Weather: %v", err)
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return fmt.Errorf("error reading PWS Weather response body: %v", err)
}
if !bytes.Contains(body, []byte("success")) {
return fmt.Errorf("bad response from PWS Weather server: %v", string(body))
}
return nil
}