-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathstorage.go
152 lines (130 loc) · 3.8 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package main
import (
"context"
"fmt"
"sync"
)
// StorageManager holds our active storage backends
type StorageManager struct {
Engines []StorageEngine
ReadingDistributor chan Reading
}
// StorageEngine holds a backend storage engine's interface as well as
// a channel for passing readings to the engine
type StorageEngine struct {
Engine StorageEngineInterface
C chan<- Reading
}
// StorageEngineInterface is an interface that provides a few standardized
// methods for various storage backends
type StorageEngineInterface interface {
StartStorageEngine(context.Context, *sync.WaitGroup) chan<- Reading
}
// NewStorageManager creats a StorageManager object, populated with all configured
// StorageEngines
func NewStorageManager(ctx context.Context, wg *sync.WaitGroup, c *Config) (*StorageManager, error) {
var err error
s := StorageManager{}
// Initialize our channel for passing metrics to the reading distributor
s.ReadingDistributor = make(chan Reading, 20)
// Start our reading distributor to distribute received readings to storage
// backends
go s.startReadingDistributor(ctx, wg)
// Check the configuration file for various supported storage backends
// and enable them if found
if c.Storage.TimescaleDB.ConnectionString != "" {
err = s.AddEngine(ctx, wg, "timescaledb", c)
if err != nil {
return &s, fmt.Errorf("could not add TimescaleDB storage backend: %v", err)
}
}
if c.Storage.InfluxDB.Host != "" {
err = s.AddEngine(ctx, wg, "influxdb", c)
if err != nil {
return &s, fmt.Errorf("could not add InfluxDB storage backend: %v", err)
}
}
if c.Storage.GRPC.Port != 0 {
err = s.AddEngine(ctx, wg, "grpc", c)
if err != nil {
return &s, fmt.Errorf("could not add gRPC storage backend: %v", err)
}
}
if c.Storage.RESTServer.Port != 0 {
err = s.AddEngine(ctx, wg, "rest", c)
if err != nil {
return &s, fmt.Errorf("could not add REST server storage backend: %v", err)
}
}
if c.Storage.APRS.Callsign != "" {
err = s.AddEngine(ctx, wg, "aprs", c)
if err != nil {
return &s, fmt.Errorf("could not add APRS storage backend: %v", err)
}
}
return &s, nil
}
// AddEngine adds a new StorageEngine of name engineName to our Storage object
func (s *StorageManager) AddEngine(ctx context.Context, wg *sync.WaitGroup, engineName string, c *Config) error {
var err error
switch engineName {
case "timescaledb":
se := StorageEngine{}
se.Engine, err = NewTimescaleDBStorage(ctx, c)
if err != nil {
return err
}
se.C = se.Engine.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "influxdb":
se := StorageEngine{}
se.Engine, err = NewInfluxDBStorage(c)
if err != nil {
return err
}
se.C = se.Engine.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "grpc":
se := StorageEngine{}
se.Engine, err = NewGRPCStorage(ctx, c)
if err != nil {
return err
}
se.C = se.Engine.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "rest":
se := StorageEngine{}
se.Engine, err = NewRESTServerStorage(ctx, c)
if err != nil {
return err
}
se.C = se.Engine.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
case "aprs":
se := StorageEngine{}
se.Engine, err = NewAPRSStorage(c)
if err != nil {
return err
}
se.C = se.Engine.StartStorageEngine(ctx, wg)
s.Engines = append(s.Engines, se)
}
return nil
}
// startReadingDistributor receives readings from gatherers and fans them out to the various
// storage backends
func (s *StorageManager) startReadingDistributor(ctx context.Context, wg *sync.WaitGroup) error {
wg.Add(1)
defer wg.Done()
for {
select {
case r := <-s.ReadingDistributor:
for _, e := range s.Engines {
e.C <- r
}
case <-ctx.Done():
log.Info("cancellation request received. Cancelling reading distributor.")
return nil
}
}
}