From 6f0fa9932d4d664855fdacf7b97f6c73f5f56231 Mon Sep 17 00:00:00 2001 From: Caleb Lloyd <2414837+caleblloyd@users.noreply.github.com> Date: Wed, 15 Mar 2023 16:33:15 -0400 Subject: [PATCH] Connection Pool + Improve JetStream Advisories (#134) * connection pool Signed-off-by: Caleb Lloyd * reverse nil check Signed-off-by: Caleb Lloyd * fix linting Signed-off-by: Caleb Lloyd * pr comments Signed-off-by: Caleb Lloyd * fix spelling --------- Signed-off-by: Caleb Lloyd --- README.md | 80 ++-- go.mod | 3 + go.sum | 2 + surveyor/collector_statz.go | 6 +- surveyor/conn_pool.go | 269 +++++++++++++ surveyor/conn_pool_test.go | 90 +++++ surveyor/jetstream_advisories.go | 543 ++++++++++++++++++++++---- surveyor/jetstream_advisories_test.go | 471 +++++++++++++++++++++- surveyor/observation.go | 226 +++++------ surveyor/observation_test.go | 497 +++++++++++++++++++++-- surveyor/surveyor.go | 247 ++++++------ surveyor/surveyor_test.go | 428 +------------------- surveyor/testdata/badjs/badauth.json | 5 + surveyor/testdata/goodjs/global.json | 2 +- test/c.nkey | 1 + test/jetstream.conf | 37 ++ test/test.go | 11 +- test/test_test.go | 9 +- 18 files changed, 2063 insertions(+), 864 deletions(-) create mode 100644 surveyor/conn_pool.go create mode 100644 surveyor/conn_pool_test.go create mode 100644 surveyor/testdata/badjs/badauth.json create mode 100644 test/c.nkey diff --git a/README.md b/README.md index 6709922..53ad395 100644 --- a/README.md +++ b/README.md @@ -367,71 +367,55 @@ More information can be found [here](https://github.com/prometheus/prometheus/is ## Service Observations Services can be observed by creating JSON files in the `observations` directory. -Both jwt credential files and nkey seed files are supported. The name of the observation has to unique. A second observation with a duplicate name will be ignored. +The file extension must be `.json`. +Only one authentication method needs to be provided. +Example file format: -Here's an example using a jwt credential file: - -```json -{ - "name": "email.subscribe", - "topic": "monitor.email.subscribe", - "credential": "/observations/email.subscribe.cred" -} -``` -Example with nkey seed file: ```json { - "name": "email.subscribe", - "topic": "monitor.email.subscribe", - "nkey": "/observations/email.subscribe.nkey" + "name": "my service", + "topic": "email.subscribe.>", + "jwt": "jwt portion of creds, must include seed also", + "seed": "seed portion of creds, must include jwt also", + "credential": "/path/to/file.creds", + "nkey": "nkey seed", + "token": "token", + "username": "username, must include password also", + "password": "password, must include user also", + "tls_ca": "/path/to/ca.pem, defaults to surveyor's ca if one exists", + "tls_cert": "/path/to/cert.pem, defaults to surveyor's cert if one exists", + "tls_key": "/path/to/key.pem, defaults to surveyor's key if one exists" } ``` -Place this in `observations/email.surbscribe.json` and create a credential giving access to this topic in `observations/email.subscribe.cred`, when you restart the service any observations published by the NATS system will be tracked and graphed. +Files are watched and updated using [fsnotify](https://github.com/fsnotify/fsnotify) ## JetStream JetStream can be monitored on a per-account basis by creating JSON files in the `jetstream` directory. -Place those files in `jetstream/youraccount.json`. Be sure that you give access to the `$JS.EVENT.>` subject to your user. - -When you add/modify account files, you'll need restart the NATS Surveyor service in order for the JetStream in this account to be monitored. - -There are some ways to establish authentication, here are some examples: +The file extension must be `.json`. +Only one authentication method needs to be provided. +e sure that you give access to the `$JS.EVENT.>` subject to your user. +Example file format: ### Credentials ```json { - "name": "Your Account", - "credential": "/jetstream/youraccount.cred" + "name": "my account", + "jwt": "jwt portion of creds, must include seed also", + "seed": "seed portion of creds, must include jwt also", + "credential": "/path/to/file.creds", + "nkey": "nkey seed", + "token": "token", + "username": "username, must include password also", + "password": "password, must include user also", + "tls_ca": "/path/to/ca.pem, defaults to surveyor's ca if one exists", + "tls_cert": "/path/to/cert.pem, defaults to surveyor's cert if one exists", + "tls_key": "/path/to/key.pem, defaults to surveyor's key if one exists" } ``` -### User/Password -```json -{ - "name": "Your Account", - "username": "accounta", - "password": "changeit" -} -``` - -### NKeys -```json -{ - "name": "Your Account", - "nkey": "UDXU4RCSJNZOIQHZNWXHXORDPRTGNJAHAHFRGZNEEJCPQTT2M7NLCNF4" -} -``` -### mTLS - -```json -{ - "name": "Your Account", - "tls_ca": "/etc/nats-certs/your-account/ca.crt", - "tls_cert": "/etc/nats-certs/your-account/tls.crt", - "tls_key": "/etc/nats-certs/your-account/tls.key" -} -``` +Files are watched and updated using [fsnotify](https://github.com/fsnotify/fsnotify) ## TODO diff --git a/go.mod b/go.mod index 7eaad22..9f8bf65 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,9 @@ require ( github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 + github.com/stretchr/testify v1.7.1 golang.org/x/crypto v0.5.0 + golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f ) require ( @@ -34,6 +36,7 @@ require ( github.com/nats-io/nkeys v0.3.1-0.20221215194120-47c7408e7546 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/spf13/afero v1.8.2 // indirect diff --git a/go.sum b/go.sum index 11037b7..d0f4bbf 100644 --- a/go.sum +++ b/go.sum @@ -380,6 +380,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8= +golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/surveyor/collector_statz.go b/surveyor/collector_statz.go index 4b6fa6e..363c509 100644 --- a/surveyor/collector_statz.go +++ b/surveyor/collector_statz.go @@ -191,8 +191,10 @@ func jsDomainLabelValue(sm *server.ServerStatsMsg) string { func jetstreamInfoLabelValues(sm *server.ServerStatsMsg) []string { // Maybe also "meta_leader", "store_dir"? - return []string{sm.Server.Name, sm.Server.Host, sm.Server.ID, sm.Server.Cluster, jsDomainLabelValue(sm), sm.Server.Version, - strconv.FormatBool(sm.Server.JetStream)} + return []string{ + sm.Server.Name, sm.Server.Host, sm.Server.ID, sm.Server.Cluster, jsDomainLabelValue(sm), sm.Server.Version, + strconv.FormatBool(sm.Server.JetStream), + } } func (sc *StatzCollector) serverLabelValues(sm *server.ServerStatsMsg) []string { diff --git a/surveyor/conn_pool.go b/surveyor/conn_pool.go new file mode 100644 index 0000000..221d1a5 --- /dev/null +++ b/surveyor/conn_pool.go @@ -0,0 +1,269 @@ +package surveyor + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + "os" + "sync" + + "github.com/nats-io/nats.go" + "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" +) + +type natsContext struct { + Name string `json:"name"` + URL string `json:"url"` + JWT string `json:"jwt"` + Seed string `json:"seed"` + Credentials string `json:"credential"` + Nkey string `json:"nkey"` + Token string `json:"token"` + Username string `json:"username"` + Password string `json:"password"` + TLSCA string `json:"tls_ca"` + TLSCert string `json:"tls_cert"` + TLSKey string `json:"tls_key"` +} + +func (c *natsContext) copy() *natsContext { + if c == nil { + return nil + } + cp := *c + return &cp +} + +func (c *natsContext) hash() (string, error) { + b, err := json.Marshal(c) + if err != nil { + return "", fmt.Errorf("error marshaling context to json: %v", err) + } + if c.Nkey != "" { + fb, err := os.ReadFile(c.Nkey) + if err != nil { + return "", fmt.Errorf("error opening nkey file %s: %v", c.Nkey, err) + } + b = append(b, fb...) + } + if c.Credentials != "" { + fb, err := os.ReadFile(c.Credentials) + if err != nil { + return "", fmt.Errorf("error opening creds file %s: %v", c.Credentials, err) + } + b = append(b, fb...) + } + if c.TLSCA != "" { + fb, err := os.ReadFile(c.TLSCA) + if err != nil { + return "", fmt.Errorf("error opening ca file %s: %v", c.TLSCA, err) + } + b = append(b, fb...) + } + if c.TLSCert != "" { + fb, err := os.ReadFile(c.TLSCert) + if err != nil { + return "", fmt.Errorf("error opening cert file %s: %v", c.TLSCert, err) + } + b = append(b, fb...) + } + if c.TLSKey != "" { + fb, err := os.ReadFile(c.TLSKey) + if err != nil { + return "", fmt.Errorf("error opening key file %s: %v", c.TLSKey, err) + } + b = append(b, fb...) + } + hash := sha256.New() + hash.Write(b) + return fmt.Sprintf("%x", hash.Sum(nil)), nil +} + +type natsContextDefaults struct { + Name string + URL string + TLSCA string + TLSCert string + TLSKey string +} + +type pooledNatsConn struct { + nc *nats.Conn + cp *natsConnPool + key string + count uint64 + closed bool +} + +func (pc *pooledNatsConn) ReturnToPool() { + pc.cp.Lock() + pc.count-- + if pc.count == 0 { + if pooledConn, ok := pc.cp.cache[pc.key]; ok && pc == pooledConn { + delete(pc.cp.cache, pc.key) + } + pc.closed = true + pc.cp.Unlock() + pc.nc.Close() + return + } + pc.cp.Unlock() +} + +type natsConnPool struct { + sync.Mutex + cache map[string]*pooledNatsConn + logger *logrus.Logger + group *singleflight.Group + natsDefaults *natsContextDefaults + natsOpts []nats.Option +} + +func newNatsConnPool(logger *logrus.Logger, natsDefaults *natsContextDefaults, natsOpts []nats.Option) *natsConnPool { + return &natsConnPool{ + cache: map[string]*pooledNatsConn{}, + group: &singleflight.Group{}, + logger: logger, + natsDefaults: natsDefaults, + natsOpts: natsOpts, + } +} + +const getPooledConnMaxTries = 10 + +// Get returns a *pooledNatsConn +func (cp *natsConnPool) Get(cfg *natsContext) (*pooledNatsConn, error) { + if cfg == nil { + return nil, fmt.Errorf("nats context must not be nil") + } + + // copy cfg + cfg = cfg.copy() + + // set defaults + if cfg.Name == "" { + cfg.Name = cp.natsDefaults.Name + } + if cfg.URL == "" { + cfg.URL = cp.natsDefaults.URL + } + if cfg.TLSCA == "" { + cfg.TLSCA = cp.natsDefaults.TLSCA + } + if cfg.TLSCert == "" { + cfg.TLSCert = cp.natsDefaults.TLSCert + } + if cfg.TLSKey == "" { + cfg.TLSKey = cp.natsDefaults.TLSKey + } + + // get hash + key, err := cfg.hash() + if err != nil { + return nil, err + } + + for i := 0; i < getPooledConnMaxTries; i++ { + connection, err := cp.getPooledConn(key, cfg) + if err != nil { + return nil, err + } + + cp.Lock() + if connection.closed { + // ReturnToPool closed this while lock not held, try again + cp.Unlock() + continue + } + + // increment count out of the pool + connection.count++ + cp.Unlock() + return connection, nil + } + + return nil, fmt.Errorf("failed to get pooled connection after %d attempts", getPooledConnMaxTries) +} + +// getPooledConn gets or establishes a *pooledNatsConn in a singleflight group, but does not increment its count +func (cp *natsConnPool) getPooledConn(key string, cfg *natsContext) (*pooledNatsConn, error) { + conn, err, _ := cp.group.Do(key, func() (interface{}, error) { + cp.Lock() + pooledConn, ok := cp.cache[key] + if ok && pooledConn.nc.IsConnected() { + cp.Unlock() + return pooledConn, nil + } + cp.Unlock() + + opts := cp.natsOpts + opts = append(opts, func(options *nats.Options) error { + if cfg.Name != "" { + options.Name = cfg.Name + } + if cfg.Token != "" { + options.Token = cfg.Token + } + if cfg.Username != "" { + options.User = cfg.Username + } + if cfg.Password != "" { + options.Password = cfg.Password + } + return nil + }) + + if cfg.JWT != "" && cfg.Seed != "" { + opts = append(opts, nats.UserJWTAndSeed(cfg.JWT, cfg.Seed)) + } + + if cfg.Nkey != "" { + opt, err := nats.NkeyOptionFromSeed(cfg.Nkey) + if err != nil { + return nil, fmt.Errorf("unable to load nkey: %v", err) + } + opts = append(opts, opt) + } + + if cfg.Credentials != "" { + opts = append(opts, nats.UserCredentials(cfg.Credentials)) + } + + if cfg.TLSCA != "" { + opts = append(opts, nats.RootCAs(cfg.TLSCA)) + } + + if cfg.TLSCert != "" && cfg.TLSKey != "" { + opts = append(opts, nats.ClientCert(cfg.TLSCert, cfg.TLSKey)) + } + + nc, err := nats.Connect(cfg.URL, opts...) + if err != nil { + return nil, err + } + cp.logger.Infof("%s connected to NATS Deployment: %s", cfg.Name, nc.ConnectedAddr()) + + connection := &pooledNatsConn{ + nc: nc, + cp: cp, + key: key, + } + + cp.Lock() + cp.cache[key] = connection + cp.Unlock() + + return connection, err + }) + + if err != nil { + return nil, err + } + + connection, ok := conn.(*pooledNatsConn) + if !ok { + return nil, fmt.Errorf("not a pooledNatsConn") + } + return connection, nil +} diff --git a/surveyor/conn_pool_test.go b/surveyor/conn_pool_test.go new file mode 100644 index 0000000..733e447 --- /dev/null +++ b/surveyor/conn_pool_test.go @@ -0,0 +1,90 @@ +package surveyor + +import ( + "sync" + "testing" + "time" + + "github.com/nats-io/nats.go" + + natsservertest "github.com/nats-io/nats-server/v2/test" + "github.com/sirupsen/logrus" + testifyAssert "github.com/stretchr/testify/assert" +) + +func TestConnPool(t *testing.T) { + s := natsservertest.RunRandClientPortServer() + defer s.Shutdown() + o1 := &natsContext{ + Name: "Client 1", + } + o2 := &natsContext{ + Name: "Client 1", + } + o3 := &natsContext{ + Name: "Client 2", + } + + natsDefaults := &natsContextDefaults{ + URL: s.ClientURL(), + } + natsOptions := []nats.Option{ + nats.MaxReconnects(10240), + } + cp := newNatsConnPool(logrus.New(), natsDefaults, natsOptions) + + var c1, c2, c3 *pooledNatsConn + var c1e, c2e, c3e error + wg := &sync.WaitGroup{} + wg.Add(3) + go func() { + c1, c1e = cp.Get(o1) + wg.Done() + }() + go func() { + c2, c2e = cp.Get(o2) + wg.Done() + }() + go func() { + c3, c3e = cp.Get(o3) + wg.Done() + }() + wg.Wait() + + assert := testifyAssert.New(t) + if assert.NoError(c1e) && assert.NoError(c2e) { + assert.Same(c1, c2) + } + if assert.NoError(c3e) { + assert.NotSame(c1, c3) + assert.NotSame(c2, c3) + } + + c1.ReturnToPool() + c3.ReturnToPool() + time.Sleep(1 * time.Second) + assert.False(c1.nc.IsClosed()) + assert.False(c2.nc.IsClosed()) + assert.True(c3.nc.IsClosed()) + + c4, c4e := cp.Get(o1) + if assert.NoError(c4e) { + assert.Same(c2, c4) + } + + c2.ReturnToPool() + c4.ReturnToPool() + time.Sleep(1 * time.Second) + assert.True(c1.nc.IsClosed()) + assert.True(c2.nc.IsClosed()) + assert.True(c4.nc.IsClosed()) + + c5, c5e := cp.Get(o1) + if assert.NoError(c5e) { + assert.NotSame(c1, c5) + } + + c5.ReturnToPool() + time.Sleep(1 * time.Second) + assert.True(c5.nc.IsClosed()) +} diff --git a/surveyor/jetstream_advisories.go b/surveyor/jetstream_advisories.go index 6e6d0ae..acb6437 100644 --- a/surveyor/jetstream_advisories.go +++ b/surveyor/jetstream_advisories.go @@ -16,16 +16,21 @@ package surveyor import ( "encoding/json" "fmt" + "io/fs" "os" + "path/filepath" "regexp" "strings" + "sync" "time" + "github.com/fsnotify/fsnotify" + "github.com/nats-io/nats.go" + "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api" "github.com/nats-io/jsm.go/api/jetstream/advisory" "github.com/nats-io/jsm.go/api/jetstream/metric" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -222,39 +227,39 @@ func NewJetStreamAdvisoryMetrics(registry *prometheus.Registry, constLabels prom return metrics } -// JSAdvisoryListener listens for JetStream advisories and expose them as prometheus data -type JSAdvisoryListener struct { - nc *nats.Conn - logger *logrus.Logger - opts *jsAdvisoryOptions - metrics *JSAdvisoryMetrics - sopts *Options -} +type JSAdvisoryConfig struct { + // unique identifier + ID string `json:"id"` -type jsAdvisoryOptions struct { + // account name AccountName string `json:"name"` + + // connection options + JWT string `json:"jwt"` + Seed string `json:"seed"` Credentials string `json:"credential"` + Nkey string `json:"nkey"` + Token string `json:"token"` Username string `json:"username"` Password string `json:"password"` - NKey string `json:"nkey"` + TLSCA string `json:"tls_ca"` TLSCert string `json:"tls_cert"` TLSKey string `json:"tls_key"` - TLSCA string `json:"tls_ca"` } -// Validate checks the options meet our expectations -func (o *jsAdvisoryOptions) Validate() error { - errs := []string{} +// Validate is used to validate a JSAdvisoryConfig +func (o *JSAdvisoryConfig) Validate() error { + if o == nil { + return fmt.Errorf("js advisory config cannot be nil") + } - if o.AccountName == "" { - errs = append(errs, "name is required") + var errs []string + if o.ID == "" { + errs = append(errs, "id is required") } - if o.Credentials != "" { - _, err := os.Stat(o.Credentials) - if err != nil { - errs = append(errs, fmt.Sprintf("invalid credential file: %s", err)) - } + if o.AccountName == "" { + errs = append(errs, "name is required") } if len(errs) == 0 { @@ -264,68 +269,111 @@ func (o *jsAdvisoryOptions) Validate() error { return fmt.Errorf(strings.Join(errs, ", ")) } -// NewJetStreamAdvisoryListener creates a new JetStream advisory reporter -func NewJetStreamAdvisoryListener(f string, sopts Options, metrics *JSAdvisoryMetrics, reconnectCtr *prometheus.CounterVec) (*JSAdvisoryListener, error) { +func (o *JSAdvisoryConfig) copy() *JSAdvisoryConfig { + if o == nil { + return nil + } + cp := *o + return &cp +} + +// NewJetStreamAdvisoryConfigFromFile creates a new JSAdvisoryConfig from a file +// the ID of the JSAdvisoryConfig is set to the filename f +func NewJetStreamAdvisoryConfigFromFile(f string) (*JSAdvisoryConfig, error) { js, err := os.ReadFile(f) if err != nil { return nil, err } - opts := &jsAdvisoryOptions{} - err = json.Unmarshal(js, opts) + jsa := &JSAdvisoryConfig{} + err = json.Unmarshal(js, jsa) if err != nil { - return nil, fmt.Errorf("invalid JetStream advisory configuration: %s: %s", f, err) + return nil, fmt.Errorf("invalid JetStream advisory config: %s: %s", f, err) } - - err = opts.Validate() + jsa.ID = f + err = jsa.Validate() if err != nil { - return nil, fmt.Errorf("invalid JetStream advisory configuration: %s: %s", f, err) + return nil, fmt.Errorf("invalid JetStream advisory config: %s: %s", f, err) } - sopts.Name = fmt.Sprintf("%s (jetstream %s)", sopts.Name, opts.AccountName) - sopts.Credentials = opts.Credentials - sopts.NATSUser = opts.Username - sopts.NATSPassword = opts.Password - sopts.Nkey = opts.NKey + return jsa, nil +} - if opts.TLSKey != "" && opts.TLSCert != "" && opts.TLSCA != "" { - sopts.CaFile = opts.TLSCA - sopts.CertFile = opts.TLSCert - sopts.KeyFile = opts.TLSKey - } +// jsAdvisoryListener listens for JetStream advisories and expose them as prometheus data +type jsAdvisoryListener struct { + sync.Mutex + config *JSAdvisoryConfig + cp *natsConnPool + logger *logrus.Logger + metrics *JSAdvisoryMetrics + pc *pooledNatsConn + subAdvisory *nats.Subscription + subMetric *nats.Subscription +} - nc, err := connect(&sopts, reconnectCtr) +func newJetStreamAdvisoryListener(config *JSAdvisoryConfig, cp *natsConnPool, logger *logrus.Logger, metrics *JSAdvisoryMetrics) (*jsAdvisoryListener, error) { + err := config.Validate() if err != nil { - return nil, fmt.Errorf("nats connection failed: %s", err) + return nil, fmt.Errorf("invalid JetStream advisory config for id: %s, account name: %s, error: %v", config.ID, config.AccountName, err) } - return &JSAdvisoryListener{ - nc: nc, - logger: sopts.Logger, - opts: opts, + return &jsAdvisoryListener{ + config: config, + cp: cp, + logger: logger, metrics: metrics, - sopts: &sopts, }, nil } -// Start starts listening for observations -func (o *JSAdvisoryListener) Start() error { - _, err := o.nc.Subscribe(api.JSAdvisoryPrefix+".>", o.advisoryHandler) +func (o *jsAdvisoryListener) natsContext() *natsContext { + natsCtx := &natsContext{ + JWT: o.config.JWT, + Seed: o.config.Seed, + Credentials: o.config.Credentials, + Nkey: o.config.Nkey, + Token: o.config.Token, + Username: o.config.Username, + Password: o.config.Password, + TLSCA: o.config.TLSCA, + TLSCert: o.config.TLSCert, + TLSKey: o.config.TLSKey, + } + + return natsCtx +} + +// Start starts listening for JetStream advisories +func (o *jsAdvisoryListener) Start() error { + o.Lock() + defer o.Unlock() + if o.pc != nil { + // already started + return nil + } + + pc, err := o.cp.Get(o.natsContext()) if err != nil { - return fmt.Errorf("could not subscribe to JetStream Advisory topic for %s (%s): %s", o.opts.AccountName, api.JSAdvisoryPrefix, err) + return fmt.Errorf("nats connection failed for id: %s, account name: %s, error: %v", o.config.ID, o.config.AccountName, err) } - o.logger.Infof("Started JetStream Advisory listener stats on %s.> for %s", api.JSAdvisoryPrefix, o.opts.AccountName) - _, err = o.nc.Subscribe(api.JSMetricPrefix+".>", o.advisoryHandler) + subAdvisory, err := pc.nc.Subscribe(api.JSAdvisoryPrefix+".>", o.advisoryHandler) if err != nil { - return fmt.Errorf("could not subscribe to JetStream Advisory topic for %s (%s): %s", o.opts.AccountName, api.JSMetricPrefix, err) + pc.ReturnToPool() + return fmt.Errorf("could not subscribe to JetStream advisory for id: %s, account name: %s, topic: %s, error: %v", o.config.ID, o.config.AccountName, api.JSAdvisoryPrefix, err) } - o.logger.Infof("Started JetStream Metric listener stats on %s.> for %s", api.JSMetricPrefix, o.opts.AccountName) - _ = o.nc.Flush() + subMetric, err := pc.nc.Subscribe(api.JSMetricPrefix+".>", o.advisoryHandler) + if err != nil { + _ = subAdvisory.Unsubscribe() + pc.ReturnToPool() + return fmt.Errorf("could not subscribe to JetStream advisory for id: %s, account name: %s, topic: %s, error: %v", o.config.ID, o.config.AccountName, api.JSMetricPrefix, err) + } + o.pc = pc + o.subAdvisory = subAdvisory + o.subMetric = subMetric + o.logger.Infof("started JetStream advisory for id: %s, account name: %s, advisory topic: %s, metric topic: %s", o.config.ID, o.config.AccountName, api.JSAdvisoryPrefix, api.JSMetricPrefix) o.metrics.jsAdvisoriesGauge.Inc() - return nil } @@ -365,71 +413,404 @@ func limitJSSubject(subj string) string { return subj } -func (o *JSAdvisoryListener) advisoryHandler(m *nats.Msg) { +func (o *jsAdvisoryListener) advisoryHandler(m *nats.Msg) { schema, event, err := jsm.ParseEvent(m.Data) if err != nil { - o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(o.opts.AccountName).Inc() + o.metrics.jsAdvisoryParseErrorCtr.WithLabelValues(o.config.AccountName).Inc() o.logger.Warnf("Could not parse JetStream API Audit Advisory: %s", err) return } - o.metrics.jsTotalAdvisoryCtr.WithLabelValues(o.opts.AccountName).Inc() + o.metrics.jsTotalAdvisoryCtr.WithLabelValues(o.config.AccountName).Inc() switch event := event.(type) { case *advisory.JetStreamAPIAuditV1: - o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), o.opts.AccountName).Inc() + o.metrics.jsAPIAuditCtr.WithLabelValues(limitJSSubject(event.Subject), o.config.AccountName).Inc() case *advisory.ConsumerDeliveryExceededAdvisoryV1: - o.metrics.jsDeliveryExceededCtr.WithLabelValues(o.opts.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) + o.metrics.jsDeliveryExceededCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) case *metric.ConsumerAckMetricV1: - o.metrics.jsAckMetricDelay.WithLabelValues(o.opts.AccountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds()) - o.metrics.jsAckMetricDeliveries.WithLabelValues(o.opts.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) + o.metrics.jsAckMetricDelay.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Observe(time.Duration(event.Delay).Seconds()) + o.metrics.jsAckMetricDeliveries.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Add(float64(event.Deliveries)) case *advisory.JSConsumerActionAdvisoryV1: - o.metrics.jsConsumerActionCtr.WithLabelValues(o.opts.AccountName, event.Stream, event.Action.String()).Inc() + o.metrics.jsConsumerActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc() case *advisory.JSStreamActionAdvisoryV1: - o.metrics.jsStreamActionCtr.WithLabelValues(o.opts.AccountName, event.Stream, event.Action.String()).Inc() + o.metrics.jsStreamActionCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Action.String()).Inc() case *advisory.JSConsumerDeliveryTerminatedAdvisoryV1: - o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(o.opts.AccountName, event.Stream, event.Consumer).Inc() + o.metrics.jsDeliveryTerminatedCtr.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc() case *advisory.JSRestoreCreateAdvisoryV1: - o.metrics.jsRestoreCreatedCtr.WithLabelValues(o.opts.AccountName, event.Stream).Inc() + o.metrics.jsRestoreCreatedCtr.WithLabelValues(o.config.AccountName, event.Stream).Inc() case *advisory.JSRestoreCompleteAdvisoryV1: - o.metrics.jsRestoreSizeCtr.WithLabelValues(o.opts.AccountName, event.Stream).Add(float64(event.Bytes)) - o.metrics.jsRestoreDuration.WithLabelValues(o.opts.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) + o.metrics.jsRestoreSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.Bytes)) + o.metrics.jsRestoreDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) case *advisory.JSSnapshotCreateAdvisoryV1: - o.metrics.jsSnapshotSizeCtr.WithLabelValues(o.opts.AccountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks)) + o.metrics.jsSnapshotSizeCtr.WithLabelValues(o.config.AccountName, event.Stream).Add(float64(event.BlkSize * event.NumBlks)) case *advisory.JSSnapshotCompleteAdvisoryV1: - o.metrics.jsSnapthotDuration.WithLabelValues(o.opts.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) + o.metrics.jsSnapthotDuration.WithLabelValues(o.config.AccountName, event.Stream).Observe(event.End.Sub(event.Start).Seconds()) case *advisory.JSConsumerLeaderElectedV1: - o.metrics.jsConsumerLeaderElected.WithLabelValues(o.opts.AccountName, event.Stream).Inc() + o.metrics.jsConsumerLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc() case *advisory.JSConsumerQuorumLostV1: - o.metrics.jsConsumerQuorumLost.WithLabelValues(o.opts.AccountName, event.Stream).Inc() + o.metrics.jsConsumerQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc() case *advisory.JSStreamLeaderElectedV1: - o.metrics.jsStreamLeaderElected.WithLabelValues(o.opts.AccountName, event.Stream).Inc() + o.metrics.jsStreamLeaderElected.WithLabelValues(o.config.AccountName, event.Stream).Inc() case *advisory.JSStreamQuorumLostV1: - o.metrics.jsStreamQuorumLost.WithLabelValues(o.opts.AccountName, event.Stream).Inc() + o.metrics.jsStreamQuorumLost.WithLabelValues(o.config.AccountName, event.Stream).Inc() case *advisory.JSConsumerDeliveryNakAdvisoryV1: - o.metrics.jsConsumerDeliveryNAK.WithLabelValues(o.opts.AccountName, event.Stream, event.Consumer).Inc() + o.metrics.jsConsumerDeliveryNAK.WithLabelValues(o.config.AccountName, event.Stream, event.Consumer).Inc() default: - o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, o.opts.AccountName).Inc() + o.metrics.jsUnknownAdvisoryCtr.WithLabelValues(schema, o.config.AccountName).Inc() o.logger.Warnf("Could not handle event as an JetStream Advisory with schema %s", schema) } } -// Stop closes the connection to the network -func (o *JSAdvisoryListener) Stop() { - o.nc.Close() +// Stop stops listening for JetStream advisories +func (o *jsAdvisoryListener) Stop() { + o.Lock() + defer o.Unlock() + if o.pc == nil { + // already stopped + return + } + + if o.subAdvisory != nil { + _ = o.subAdvisory.Unsubscribe() + o.subAdvisory = nil + } + + if o.subMetric != nil { + _ = o.subMetric.Unsubscribe() + o.subMetric = nil + } + + o.metrics.jsAdvisoriesGauge.Dec() + o.pc.ReturnToPool() + o.pc = nil +} + +// JSAdvisoryManager exposes methods to operate on JetStream advisories +type JSAdvisoryManager struct { + sync.Mutex + cp *natsConnPool + listenerMap map[string]*jsAdvisoryListener + logger *logrus.Logger + metrics *JSAdvisoryMetrics +} + +// newJetStreamAdvisoryManager creates a JSAdvisoryManager for managing JetStream advisories +func newJetStreamAdvisoryManager(cp *natsConnPool, logger *logrus.Logger, metrics *JSAdvisoryMetrics) *JSAdvisoryManager { + return &JSAdvisoryManager{ + cp: cp, + logger: logger, + metrics: metrics, + } +} + +func (am *JSAdvisoryManager) start() { + am.Lock() + defer am.Unlock() + if am.listenerMap != nil { + // already started + return + } + + am.listenerMap = map[string]*jsAdvisoryListener{} +} + +// IsRunning returns true if the advisory manager is running or false if it is stopped +func (am *JSAdvisoryManager) IsRunning() bool { + am.Lock() + defer am.Unlock() + return am.listenerMap != nil +} + +func (am *JSAdvisoryManager) stop() { + am.Lock() + defer am.Unlock() + if am.listenerMap == nil { + // already stopped + return + } + + for _, adv := range am.listenerMap { + adv.Stop() + } + am.metrics.jsAdvisoriesGauge.Set(0) + am.listenerMap = nil +} + +// ConfigMap returns a map of id:*JSAdvisoryConfig for all running advisories +func (am *JSAdvisoryManager) ConfigMap() map[string]*JSAdvisoryConfig { + am.Lock() + defer am.Unlock() + + advMap := make(map[string]*JSAdvisoryConfig, len(am.listenerMap)) + if am.listenerMap == nil { + return advMap + } + + for id, adv := range am.listenerMap { + // copy so that internal references cannot be changed + advMap[id] = adv.config.copy() + } + + return advMap +} + +// Set creates or updates an advisory +// if an advisory exists with the same ID, it is updated +// otherwise, a new advisory is created +func (am *JSAdvisoryManager) Set(config *JSAdvisoryConfig) error { + err := config.Validate() + if err != nil { + return err + } + + // copy so that internal references cannot be changed + config = config.copy() + + am.Lock() + if am.listenerMap == nil { + am.Unlock() + return fmt.Errorf("advisory manager is stopped; could not set advisory for id: %s, account name: %s", config.ID, config.AccountName) + } + + existingAdv, found := am.listenerMap[config.ID] + am.Unlock() + + if found && *config == *existingAdv.config { + return nil + } + + adv, err := newJetStreamAdvisoryListener(config, am.cp, am.logger, am.metrics) + if err != nil { + return fmt.Errorf("could not set advisory for id: %s, account name: %s, error: %v", config.ID, config.AccountName, err) + } + + if err := adv.Start(); err != nil { + return fmt.Errorf("could not start advisory for id: %s, account name: %s, error: %v", config.ID, config.AccountName, err) + } + + am.Lock() + if am.listenerMap == nil { + am.Unlock() + adv.Stop() + return fmt.Errorf("advisory manager is stopped; could not set advisory for id: %s, account name: %s", config.ID, config.AccountName) + } + + am.listenerMap[config.ID] = adv + am.Unlock() + + if found { + existingAdv.Stop() + } + return nil +} + +// Delete deletes existing advisory with provided ID +func (am *JSAdvisoryManager) Delete(id string) error { + am.Lock() + if am.listenerMap == nil { + am.Unlock() + return fmt.Errorf("advisory manager is stopped; could not delete advisory id: %s", id) + } + + existingAdv, found := am.listenerMap[id] + if !found { + am.Unlock() + return fmt.Errorf("advisory with given ID does not exist: %s", id) + } + + delete(am.listenerMap, id) + am.Unlock() + + existingAdv.Stop() + return nil +} + +type jsAdvisoryFSWatcher struct { + sync.Mutex + am *JSAdvisoryManager + logger *logrus.Logger + stopCh chan struct{} + watcher *fsnotify.Watcher +} + +func newJetStreamAdvisoryFSWatcher(logger *logrus.Logger, am *JSAdvisoryManager) *jsAdvisoryFSWatcher { + return &jsAdvisoryFSWatcher{ + logger: logger, + am: am, + } +} + +func (w *jsAdvisoryFSWatcher) start() error { + w.Lock() + defer w.Unlock() + if w.watcher != nil { + return nil + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + stopCh := make(chan struct{}, 1) + + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + if err := w.handleWatcherEvent(event); err != nil { + w.logger.Warn(err) + } + case <-stopCh: + return + } + } + }() + + w.watcher = watcher + w.stopCh = stopCh + return nil +} + +func (w *jsAdvisoryFSWatcher) stop() { + w.Lock() + defer w.Unlock() + + if w.watcher == nil { + return + } + + w.stopCh <- struct{}{} + _ = w.watcher.Close() + w.watcher = nil +} + +func (w *jsAdvisoryFSWatcher) startAdvisoriesInDir() fs.WalkDirFunc { + return func(path string, info fs.DirEntry, err error) error { + if err != nil { + return err + } + + // skip directories starting with '..' + // this prevents double advisory loading when using kubernetes mounts + if info.IsDir() && strings.HasPrefix(info.Name(), "..") { + return filepath.SkipDir + } + + if info.IsDir() { + w.Lock() + if w.watcher != nil { + _ = w.watcher.Add(path) + } + w.Unlock() + } + + if filepath.Ext(info.Name()) != ".json" { + return nil + } + + adv, err := NewJetStreamAdvisoryConfigFromFile(path) + if err != nil { + return fmt.Errorf("could not create advisory from path: %s, error: %v", path, err) + } + + return w.am.Set(adv) + } +} + +func (w *jsAdvisoryFSWatcher) handleWatcherEvent(event fsnotify.Event) error { + path := event.Name + + switch { + case event.Has(fsnotify.Create): + return w.handleCreateEvent(path) + case event.Has(fsnotify.Write) && !event.Has(fsnotify.Remove): + return w.handleWriteEvent(path) + case event.Has(fsnotify.Remove): + return w.handleRemoveEvent(path) + } + return nil +} + +func (w *jsAdvisoryFSWatcher) handleCreateEvent(path string) error { + stat, err := os.Stat(path) + if err != nil { + return fmt.Errorf("could not stat advisory path %s: %s", path, err) + } + + // if a new directory was created, start advisory in dir + if stat.IsDir() { + return filepath.WalkDir(path, w.startAdvisoriesInDir()) + } + + // if not a directory and not a JSON, ignore + if filepath.Ext(stat.Name()) != ".json" { + return nil + } + + // handle as a write event + return w.handleWriteEvent(path) +} + +func (w *jsAdvisoryFSWatcher) handleWriteEvent(path string) error { + stat, err := os.Stat(path) + if err != nil { + return fmt.Errorf("could not stat advisory path: %s, error: %v", path, err) + } + + // if not a JSON file, ignore + if stat.IsDir() || filepath.Ext(stat.Name()) != ".json" { + return nil + } + + config, err := NewJetStreamAdvisoryConfigFromFile(path) + if err != nil { + return fmt.Errorf("could not create advisory from path: %s, error: %v", path, err) + } + + return w.am.Set(config) +} + +func (w *jsAdvisoryFSWatcher) handleRemoveEvent(path string) error { + var removeIDs []string + configMap := w.am.ConfigMap() + dir := strings.TrimSuffix(path, string(filepath.Separator)) + string(filepath.Separator) + for id := range configMap { + if id == path { + // file + removeIDs = append(removeIDs, id) + } else if strings.HasPrefix(id, dir) { + // directory + removeIDs = append(removeIDs, id) + } + } + + var err error + if len(removeIDs) > 0 { + for _, removeID := range removeIDs { + if removeErr := w.am.Delete(removeID); removeErr != nil { + err = removeErr + } + } + } + + return err } diff --git a/surveyor/jetstream_advisories_test.go b/surveyor/jetstream_advisories_test.go index 99dde7f..4db2ed4 100644 --- a/surveyor/jetstream_advisories_test.go +++ b/surveyor/jetstream_advisories_test.go @@ -2,12 +2,17 @@ package surveyor import ( "bytes" + "encoding/json" + "fmt" + "os" + "reflect" "testing" "time" + "github.com/nats-io/nats.go" + "github.com/nats-io/jsm.go" st "github.com/nats-io/nats-surveyor/test" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" ptu "github.com/prometheus/client_golang/prometheus/testutil" ) @@ -23,22 +28,44 @@ func TestJetStream_Load(t *testing.T) { Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) + cp := newSurveyorConnPool(opt, reconnectCtr) - obs, err := NewJetStreamAdvisoryListener("testdata/goodjs/global.json", *opt, metrics, reconnectCtr) + config, err := NewJetStreamAdvisoryConfigFromFile("testdata/goodjs/global.json") if err != nil { - t.Fatalf("jetstream load error: %s", err) + t.Fatalf("advisory config error: %s", err) + } + adv, err := newJetStreamAdvisoryListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("advisory listener error: %s", err) + } + err = adv.Start() + if err != nil { + t.Fatalf("advisory start error: %s", err) } - obs.Stop() + adv.Stop() - _, err = NewJetStreamAdvisoryListener("testdata/badjs/missing.json", *opt, metrics, reconnectCtr) + _, err = NewJetStreamAdvisoryConfigFromFile("testdata/badjs/missing.json") if err.Error() != "open testdata/badjs/missing.json: no such file or directory" { t.Fatalf("jetstream load error: %s", err) } - _, err = NewJetStreamAdvisoryListener("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) - if err.Error() != "invalid JetStream advisory configuration: testdata/badobs/bad.json: name is required" { + _, err = NewJetStreamAdvisoryConfigFromFile("testdata/badjs/bad.json") + if err.Error() != "invalid JetStream advisory config: testdata/badjs/bad.json: name is required" { t.Fatalf("jetstream load error: %s", err) } + + config, err = NewJetStreamAdvisoryConfigFromFile("testdata/badjs/badauth.json") + if err != nil { + t.Fatalf("observation config error: %s", err) + } + adv, err = newJetStreamAdvisoryListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("observation listener error: %s", err) + } + err = adv.Start() + if err.Error() != "nats connection failed for id: testdata/badjs/badauth.json, account name: testing, error: nats: Authorization Violation" { + t.Fatalf("observation load error does not match expected error: %s", err) + } } func TestJetStream_limitJSSubject(t *testing.T) { @@ -69,17 +96,21 @@ func TestJetStream_Handle(t *testing.T) { Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) + cp := newSurveyorConnPool(opt, reconnectCtr) - obs, err := NewJetStreamAdvisoryListener("testdata/goodjs/global.json", *opt, metrics, reconnectCtr) + config, err := NewJetStreamAdvisoryConfigFromFile("testdata/goodjs/global.json") if err != nil { - t.Fatalf("jetstream load error: %s", err) + t.Fatalf("advisory config error: %s", err) } - defer obs.Stop() - - err = obs.Start() + adv, err := newJetStreamAdvisoryListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("advisory listener error: %s", err) + } + err = adv.Start() if err != nil { - t.Fatalf("jetstream failed to start: %s", err) + t.Fatalf("advisory start error: %s", err) } + defer adv.Stop() nc, err := nats.Connect(js.ClientURL(), nats.UseOldRequestStyle()) if err != nil { @@ -189,3 +220,417 @@ nats_jetstream_acknowledgement_deliveries{account="global",consumer="OUT",stream t.Fatalf("metrics failed: %s", err) } } + +func TestSurveyor_AdvisoriesFromFile(t *testing.T) { + js := st.NewJetStreamServer(t) + defer js.Shutdown() + + opts := getTestOptions() + opts.URLs = js.ClientURL() + opts.JetStreamConfigDir = "testdata/goodjs" + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + + if ptu.ToFloat64(s.JetStreamAdvisoryManager().metrics.jsAdvisoriesGauge) != 1 { + t.Fatalf("process error: advisories not started") + } +} + +func TestSurveyor_Advisories(t *testing.T) { + js := st.NewJetStreamServer(t) + defer js.Shutdown() + + opts := getTestOptions() + opts.URLs = js.ClientURL() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + am := s.JetStreamAdvisoryManager() + + expectedAdvisories := make(map[string]*JSAdvisoryConfig) + advisories := []*JSAdvisoryConfig{ + { + ID: "a", + AccountName: "a", + Username: "a", + Password: "a", + }, + { + ID: "b", + AccountName: "b", + Username: "b", + Password: "b", + }, + { + ID: "c", + AccountName: "c", + Nkey: "../test/c.nkey", + }, + } + + advIDs := make([]string, 0) + for _, adv := range advisories { + err := am.Set(adv) + if err != nil { + t.Errorf("Unexpected error on advisory set: %s", err) + } + advIDs = append(advIDs, adv.ID) + expectedAdvisories[adv.ID] = adv + } + waitForAdvUpdate(t, am, expectedAdvisories) + + setAdvisory := &JSAdvisoryConfig{ + ID: advIDs[0], + AccountName: "aa", + Username: "a", + Password: "a", + } + expectedAdvisories[advIDs[0]] = setAdvisory + err = am.Set(setAdvisory) + if err != nil { + t.Errorf("Unexpected error on advisory set: %s", err) + } + waitForAdvUpdate(t, am, expectedAdvisories) + var found bool + advMap := am.ConfigMap() + for _, adv := range advMap { + if adv.AccountName == "aa" { + found = true + break + } + } + + if !found { + t.Errorf("Expected updated account name in advisory: %s", "aa") + } + deleteID := advIDs[0] + err = am.Delete(deleteID) + delete(expectedAdvisories, deleteID) + if err != nil { + t.Errorf("Unexpected error on advisory delete request: %s", err) + } + waitForAdvUpdate(t, am, expectedAdvisories) + + // advisory no longer exists + err = am.Delete(deleteID) + if err == nil { + t.Error("Expected error; got nil") + } + waitForAdvUpdate(t, am, expectedAdvisories) +} + +func TestSurveyor_AdvisoriesError(t *testing.T) { + js := st.NewJetStreamServer(t) + defer js.Shutdown() + + opts := getTestOptions() + opts.URLs = js.ClientURL() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + om := s.JetStreamAdvisoryManager() + if err != nil { + t.Fatalf("Error creating advisories manager: %s", err) + } + + // add invalid advisory (missing account name) + err = om.Set( + &JSAdvisoryConfig{ + ID: "id", + AccountName: "", + }, + ) + + if err == nil { + t.Errorf("Expected error; got nil") + } + + // valid advisory, no error + err = om.Set( + &JSAdvisoryConfig{ + ID: "id", + AccountName: "global", + }, + ) + + if err != nil { + t.Errorf("Expected no error; got: %s", err) + } + + // update error, invalid config + err = om.Set( + &JSAdvisoryConfig{ + ID: "id", + AccountName: "", + }, + ) + + if err == nil { + t.Errorf("Expected error; got nil") + } +} + +func waitForAdvUpdate(t *testing.T, am *JSAdvisoryManager, expectedAdvisories map[string]*JSAdvisoryConfig) { + t.Helper() + ticker := time.NewTicker(50 * time.Millisecond) + timeout := time.After(5 * time.Second) + defer ticker.Stop() +Outer: + for { + select { + case <-ticker.C: + advisoriesNum := ptu.ToFloat64(am.metrics.jsAdvisoriesGauge) + if advisoriesNum == float64(len(expectedAdvisories)) { + break Outer + } + case <-timeout: + advisoriesNum := ptu.ToFloat64(am.metrics.jsAdvisoriesGauge) + t.Fatalf("process error: invalid number of advisories; want: %d; got: %f\n", len(expectedAdvisories), advisoriesNum) + return + } + } + + existingAdvisories := am.ConfigMap() + if len(existingAdvisories) != len(expectedAdvisories) { + t.Fatalf("Unexpected number of advisories; want: %d; got: %d", len(expectedAdvisories), len(existingAdvisories)) + } + for _, existingAdvisory := range existingAdvisories { + obs, ok := expectedAdvisories[existingAdvisory.ID] + if !ok { + t.Fatalf("Missing advisory with ID: %s", existingAdvisory.ID) + } + if !reflect.DeepEqual(obs, existingAdvisory) { + t.Fatalf("Invalid advisory config; want: %+v; got: %+v", obs, existingAdvisory) + } + } +} + +func TestSurveyor_AdvisoriesWatcher(t *testing.T) { + js := st.NewJetStreamServer(t) + defer js.Shutdown() + + opts := getTestOptions() + opts.URLs = js.ClientURL() + + dirName := fmt.Sprintf("testdata/adv%d", time.Now().UnixNano()) + if err := os.Mkdir(dirName, 0o700); err != nil { + t.Fatalf("Error creating advisories dir: %s", err) + } + defer os.RemoveAll(dirName) + opts.JetStreamConfigDir = dirName + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + time.Sleep(200 * time.Millisecond) + + am := s.JetStreamAdvisoryManager() + expectedAdvisories := make(map[string]*JSAdvisoryConfig) + + t.Run("write advisory file - create operation", func(t *testing.T) { + advConfig := &JSAdvisoryConfig{ + AccountName: "a", + Username: "a", + Password: "a", + } + advConfigJSON, err := json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + advPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.WriteFile(advPath, advConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + }) + + t.Run("first create then write to file - write operation", func(t *testing.T) { + advConfig := &JSAdvisoryConfig{ + AccountName: "b", + Username: "b", + Password: "b", + } + advConfigJSON, err := json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + advPath := fmt.Sprintf("%s/write.json", dirName) + f, err := os.Create(advPath) + if err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Error closing file: %s", err) + } + time.Sleep(200 * time.Millisecond) + if err := os.WriteFile(advPath, advConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + }) + + t.Run("create advisories in subfolder", func(t *testing.T) { + advConfig := &JSAdvisoryConfig{ + AccountName: "c", + Credentials: "../test/c.nkey", + } + advConfigJSON, err := json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + if err := os.Mkdir(fmt.Sprintf("%s/subdir", dirName), 0o700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + advPath := fmt.Sprintf("%s/subdir/subadv.json", dirName) + + err = os.WriteFile(advPath, advConfigJSON, 0o600) + if err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + + advConfig = &JSAdvisoryConfig{ + AccountName: "d", + Username: "d", + Password: "d", + } + advConfigJSON, err = json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + advPath = fmt.Sprintf("%s/subdir/abc.json", dirName) + + if err := os.WriteFile(advPath, advConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + + advConfig = &JSAdvisoryConfig{ + AccountName: "global", + } + advConfigJSON, err = json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + if err := os.Mkdir(fmt.Sprintf("%s/subdir/nested", dirName), 0o700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + advPath = fmt.Sprintf("%s/subdir/nested/nested.json", dirName) + err = os.WriteFile(advPath, advConfigJSON, 0o600) + if err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + }) + + t.Run("update advisories", func(t *testing.T) { + advConfig := &JSAdvisoryConfig{ + AccountName: "bb", + Username: "b", + Password: "b", + } + advConfigJSON, err := json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + advPath := fmt.Sprintf("%s/write.json", dirName) + if err := os.WriteFile(advPath, advConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + + // update file with invalid JSON - existing advisory should not be impacted + if err := os.WriteFile(advPath, []byte("abc"), 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + time.Sleep(100 * time.Millisecond) + waitForAdvUpdate(t, am, expectedAdvisories) + }) + + t.Run("remove advisories", func(t *testing.T) { + // remove single advisory + advPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.Remove(advPath); err != nil { + t.Fatalf("Error removing advisory config: %s", err) + } + delete(expectedAdvisories, advPath) + waitForAdvUpdate(t, am, expectedAdvisories) + + // remove whole subfolder + if err := os.RemoveAll(fmt.Sprintf("%s/subdir", dirName)); err != nil { + t.Fatalf("Error removing subdirectory: %s", err) + } + + delete(expectedAdvisories, fmt.Sprintf("%s/subdir/subadv.json", dirName)) + delete(expectedAdvisories, fmt.Sprintf("%s/subdir/abc.json", dirName)) + delete(expectedAdvisories, fmt.Sprintf("%s/subdir/nested/nested.json", dirName)) + waitForAdvUpdate(t, am, expectedAdvisories) + + advConfig := &JSAdvisoryConfig{ + AccountName: "aa", + Username: "a", + Password: "a", + } + advConfigJSON, err := json.Marshal(advConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + advPath = fmt.Sprintf("%s/another.json", dirName) + if err := os.WriteFile(advPath, advConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing advisory config file: %s", err) + } + + advConfig.ID = advPath + expectedAdvisories[advPath] = advConfig + waitForAdvUpdate(t, am, expectedAdvisories) + }) +} diff --git a/surveyor/observation.go b/surveyor/observation.go index fa9ae6d..820a012 100644 --- a/surveyor/observation.go +++ b/surveyor/observation.go @@ -24,10 +24,11 @@ import ( "strings" "sync" + "github.com/nats-io/nats.go" + "github.com/fsnotify/fsnotify" "github.com/nats-io/jsm.go" "github.com/nats-io/jsm.go/api/server/metric" - "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) @@ -114,34 +115,35 @@ func NewServiceObservationMetrics(registry *prometheus.Registry, constLabels pro return metrics } -// ServiceObsListener listens for observations from nats service latency checks. -// -// Deprecated: ServiceObsListener will be unexported in a future release -// Use ServiceObsConfig and Surveyor.ServiceObservationManager instead -type ServiceObsListener struct { - nc *nats.Conn - logger *logrus.Logger - config *ServiceObsConfig - metrics *ServiceObsMetrics - sopts *Options -} - // ServiceObsConfig is used to configure service observations type ServiceObsConfig struct { - ID string `json:"id"` + // unique identifier + ID string `json:"id"` + + // service settings ServiceName string `json:"name"` Topic string `json:"topic"` + + // connection options + JWT string `json:"jwt"` + Seed string `json:"seed"` Credentials string `json:"credential"` Nkey string `json:"nkey"` + Token string `json:"token"` + Username string `json:"username"` + Password string `json:"password"` + TLSCA string `json:"tls_ca"` + TLSCert string `json:"tls_cert"` + TLSKey string `json:"tls_key"` } // Validate is used to validate a ServiceObsConfig func (o *ServiceObsConfig) Validate() error { - var errs []string if o == nil { return fmt.Errorf("service observation config cannot be nil") } + var errs []string if o.ID == "" { errs = append(errs, "id is required") } @@ -154,23 +156,6 @@ func (o *ServiceObsConfig) Validate() error { errs = append(errs, "topic is required") } - switch { - case o.Credentials == "" && o.Nkey == "": - errs = append(errs, "jwt or nkey credentials is required") - case o.Credentials != "" && o.Nkey != "": - errs = append(errs, "both jwt and nkey credentials found, only one can be used") - case o.Credentials != "": - _, err := os.Stat(o.Credentials) - if err != nil { - errs = append(errs, fmt.Sprintf("invalid credential file: %s", err)) - } - case o.Nkey != "": - _, err := os.Stat(o.Nkey) - if err != nil { - errs = append(errs, fmt.Sprintf("invalid nkey file: %s", err)) - } - } - if len(errs) == 0 { return nil } @@ -208,69 +193,85 @@ func NewServiceObservationConfigFromFile(f string) (*ServiceObsConfig, error) { return obs, nil } -// NewServiceObservation creates a new service observation listener from a JSON file. -// -// Deprecated: ServiceObsListener will be unexported in a future release -// Use NewServiceObservationConfigFromFile and Surveyor.ServiceObservationManager instead -func NewServiceObservation(f string, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) (*ServiceObsListener, error) { - serviceObs, err := NewServiceObservationConfigFromFile(f) - if err != nil { - return nil, err - } - - obs, err := newServiceObservationListener(serviceObs, sopts, metrics, reconnectCtr) - if err != nil { - return nil, err - } - - return obs, nil +// serviceObsListener listens for observations from nats service latency checks +type serviceObsListener struct { + sync.Mutex + config *ServiceObsConfig + cp *natsConnPool + logger *logrus.Logger + metrics *ServiceObsMetrics + pc *pooledNatsConn + sub *nats.Subscription } -func newServiceObservationListener(config *ServiceObsConfig, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) (*ServiceObsListener, error) { +func newServiceObservationListener(config *ServiceObsConfig, cp *natsConnPool, logger *logrus.Logger, metrics *ServiceObsMetrics) (*serviceObsListener, error) { err := config.Validate() if err != nil { return nil, fmt.Errorf("invalid service observation config for id: %s, service name: %s, error: %v", config.ID, config.ServiceName, err) } - sopts.Name = fmt.Sprintf("%s (observing %s)", sopts.Name, config.ServiceName) - sopts.Credentials = config.Credentials - sopts.Nkey = config.Nkey - nc, err := connect(&sopts, reconnectCtr) - if err != nil { - return nil, fmt.Errorf("nats connection failed: %s", err) - } - - return &ServiceObsListener{ - nc: nc, - logger: sopts.Logger, + return &serviceObsListener{ config: config, + cp: cp, + logger: logger, metrics: metrics, - sopts: &sopts, }, nil } -// Start starts listening for observations. -func (o *ServiceObsListener) Start() error { - _, err := o.nc.Subscribe(o.config.Topic, o.observationHandler) +func (o *serviceObsListener) natsContext() *natsContext { + natsCtx := &natsContext{ + JWT: o.config.JWT, + Seed: o.config.Seed, + Credentials: o.config.Credentials, + Nkey: o.config.Nkey, + Token: o.config.Token, + Username: o.config.Username, + Password: o.config.Password, + TLSCA: o.config.TLSCA, + TLSCert: o.config.TLSCert, + TLSKey: o.config.TLSKey, + } + + // legacy Credentials field + if natsCtx.Credentials == "" && o.config.Credentials != "" { + natsCtx.Credentials = o.config.Credentials + o.logger.Warnf("deprecated service observation config field 'credential', use 'creds' instead for id: %s, service name: %s", o.config.ID, o.config.ServiceName) + } + return natsCtx +} + +// Start starts listening for observations +func (o *serviceObsListener) Start() error { + o.Lock() + defer o.Unlock() + if o.pc != nil { + // already started + return nil + } + + pc, err := o.cp.Get(o.natsContext()) if err != nil { - return fmt.Errorf("could not subscribe to service observation topic for id: %s, service name: %s, topic: %s, error: %v", o.config.ID, o.config.ServiceName, o.config.Topic, err) + return fmt.Errorf("nats connection failed for id: %s, service name: %s, error: %v", o.config.ID, o.config.ServiceName, err) } - err = o.nc.Flush() + + sub, err := pc.nc.Subscribe(o.config.Topic, o.observationHandler) if err != nil { - return err + pc.ReturnToPool() + return fmt.Errorf("could not subscribe to service observation topic for id: %s, service name: %s, topic: %s, error: %v", o.config.ID, o.config.ServiceName, o.config.Topic, err) } + o.pc = pc + o.sub = sub o.metrics.observationsGauge.Inc() o.logger.Infof("started service observation for id: %s, service name: %s, topic: %s", o.config.ID, o.config.ServiceName, o.config.Topic) - return nil } -func (o *ServiceObsListener) observationHandler(m *nats.Msg) { +func (o *serviceObsListener) observationHandler(m *nats.Msg) { kind, obs, err := jsm.ParseEvent(m.Data) if err != nil { o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName).Inc() - o.logger.Warnf("unparsable service observation received for id: %s, service name: %s, subject: %s, error: %v, data: %q", o.config.ID, o.config.ServiceName, m.Subject, err, m.Data) + o.logger.Warnf("unparsable service observation received for id: %s, service name: %s, error: %v, data: %q", o.config.ID, o.config.ServiceName, err, m.Data) return } @@ -291,86 +292,91 @@ func (o *ServiceObsListener) observationHandler(m *nats.Msg) { default: o.metrics.invalidObservationsReceived.WithLabelValues(o.config.ServiceName).Inc() - o.logger.Warnf("unsupported service observation received for id: %s, service name: %s, subject: %s, kind: %s", o.config.ID, o.config.ServiceName, m.Subject, kind) + o.logger.Warnf("unsupported service observation received for id: %s, service name: %s, kind: %s", o.config.ID, o.config.ServiceName, kind) return } } -// Stop closes the connection to the network. -func (o *ServiceObsListener) Stop() { +// Stop stops listening for observations +func (o *serviceObsListener) Stop() { + o.Lock() + defer o.Unlock() + if o.pc == nil { + // already stopped + return + } + + if o.sub != nil { + _ = o.sub.Unsubscribe() + o.sub = nil + } + o.metrics.observationsGauge.Dec() - o.nc.Close() + o.pc.ReturnToPool() + o.pc = nil } -// ServiceObsManager exposes methods to operate on service observations. +// ServiceObsManager exposes methods to operate on service observations type ServiceObsManager struct { sync.Mutex - listenerMap map[string]*ServiceObsListener - logger *logrus.Logger - metrics *ServiceObsMetrics - reconnectCtr *prometheus.CounterVec - sopts Options - running bool - watcherStopChMap map[string]chan struct{} + cp *natsConnPool + listenerMap map[string]*serviceObsListener + logger *logrus.Logger + metrics *ServiceObsMetrics } -// newServiceObservationManager creates a ServiceObsManager, allowing for adding/deleting service observations to the surveyor. -func newServiceObservationManager(logger *logrus.Logger, sopts Options, metrics *ServiceObsMetrics, reconnectCtr *prometheus.CounterVec) *ServiceObsManager { +// newServiceObservationManager creates a ServiceObsManager for managing Service Observations +func newServiceObservationManager(cp *natsConnPool, logger *logrus.Logger, metrics *ServiceObsMetrics) *ServiceObsManager { return &ServiceObsManager{ - logger: logger, - sopts: sopts, - metrics: metrics, - reconnectCtr: reconnectCtr, - listenerMap: map[string]*ServiceObsListener{}, - watcherStopChMap: map[string]chan struct{}{}, - running: false, + cp: cp, + logger: logger, + metrics: metrics, } } func (om *ServiceObsManager) start() { om.Lock() defer om.Unlock() - if om.running { + if om.listenerMap != nil { + // already started return } - om.running = true + om.listenerMap = map[string]*serviceObsListener{} } // IsRunning returns true if the observation manager is running or false if it is stopped func (om *ServiceObsManager) IsRunning() bool { om.Lock() defer om.Unlock() - return om.running + return om.listenerMap != nil } func (om *ServiceObsManager) stop() { om.Lock() defer om.Unlock() - if !om.running { + if om.listenerMap == nil { + // already stopped return } for _, obs := range om.listenerMap { obs.Stop() } - om.listenerMap = map[string]*ServiceObsListener{} - - for _, watcherStopCh := range om.watcherStopChMap { - watcherStopCh <- struct{}{} - } - om.watcherStopChMap = map[string]chan struct{}{} - om.metrics.observationsGauge.Set(0) - om.running = false + om.listenerMap = nil } -// ConfigMap returns a map of id:*ServiceObsConfig for all running observations. +// ConfigMap returns a map of id:*ServiceObsConfig for all running observations func (om *ServiceObsManager) ConfigMap() map[string]*ServiceObsConfig { om.Lock() defer om.Unlock() obsMap := make(map[string]*ServiceObsConfig, len(om.listenerMap)) + if om.listenerMap == nil { + return obsMap + } + for id, obs := range om.listenerMap { // copy so that internal references cannot be changed obsMap[id] = obs.config.copy() @@ -392,7 +398,7 @@ func (om *ServiceObsManager) Set(config *ServiceObsConfig) error { config = config.copy() om.Lock() - if !om.running { + if om.listenerMap == nil { om.Unlock() return fmt.Errorf("observation manager is stopped; could not set observation for id: %s, service name: %s", config.ID, config.ServiceName) } @@ -404,7 +410,7 @@ func (om *ServiceObsManager) Set(config *ServiceObsConfig) error { return nil } - obs, err := newServiceObservationListener(config, om.sopts, om.metrics, om.reconnectCtr) + obs, err := newServiceObservationListener(config, om.cp, om.logger, om.metrics) if err != nil { return fmt.Errorf("could not set observation for id: %s, service name: %s, error: %v", config.ID, config.ServiceName, err) } @@ -414,7 +420,7 @@ func (om *ServiceObsManager) Set(config *ServiceObsConfig) error { } om.Lock() - if !om.running { + if om.listenerMap == nil { om.Unlock() obs.Stop() return fmt.Errorf("observation manager is stopped; could not set observation for id: %s, service name: %s", config.ID, config.ServiceName) @@ -432,9 +438,9 @@ func (om *ServiceObsManager) Set(config *ServiceObsConfig) error { // Delete deletes existing observations with provided ID func (om *ServiceObsManager) Delete(id string) error { om.Lock() - if !om.running { + if om.listenerMap == nil { om.Unlock() - return fmt.Errorf("observation manager is stopped; could not delete observation id: %s", id) + return fmt.Errorf("could not delete observation id: %s: observation manager is stopped", id) } existingObs, found := om.listenerMap[id] @@ -454,8 +460,8 @@ type serviceObsFSWatcher struct { sync.Mutex logger *logrus.Logger om *ServiceObsManager - watcher *fsnotify.Watcher stopCh chan struct{} + watcher *fsnotify.Watcher } func newServiceObservationFSWatcher(logger *logrus.Logger, om *ServiceObsManager) *serviceObsFSWatcher { diff --git a/surveyor/observation_test.go b/surveyor/observation_test.go index f07c231..4085804 100644 --- a/surveyor/observation_test.go +++ b/surveyor/observation_test.go @@ -16,6 +16,9 @@ package surveyor import ( "bytes" "encoding/json" + "fmt" + "os" + "reflect" "testing" "time" @@ -36,59 +39,43 @@ func TestServiceObservation_Load(t *testing.T) { Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) + cp := newSurveyorConnPool(opt, reconnectCtr) - obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + config, err := NewServiceObservationConfigFromFile("testdata/goodobs/good.json") if err != nil { - t.Fatalf("observation load error: %s", err) - } - obs.Stop() - - _, err = NewServiceObservation("testdata/badobs/missing.json", *opt, metrics, reconnectCtr) - if err.Error() != "open testdata/badobs/missing.json: no such file or directory" { - t.Fatalf("observation load error: %s", err) + t.Fatalf("observation config error: %s", err) } - - _, err = NewServiceObservation("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) - if err.Error() != "invalid service observation config: testdata/badobs/bad.json: name is required, topic is required, jwt or nkey credentials is required" { - t.Fatalf("observation load error: %s", err) - } - - _, err = NewServiceObservation("testdata/badobs/badauth.json", *opt, metrics, reconnectCtr) - if err.Error() != "nats connection failed: nats: Authorization Violation" { - t.Fatalf("observation load error: %s", err) + obs, err := newServiceObservationListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("observation listener error: %s", err) } -} - -func TestServiceObservation_LoadDynamically(t *testing.T) { - sc := st.NewSuperCluster(t) - defer sc.Shutdown() - - opt := getTestOptions() - metrics := NewServiceObservationMetrics(prometheus.NewRegistry(), nil) - reconnectCtr := prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), - Help: "Number of times the surveyor reconnected to the NATS cluster", - }, []string{"name"}) - - obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + err = obs.Start() if err != nil { - t.Fatalf("observation load error: %s", err) + t.Fatalf("observation start error: %s", err) } obs.Stop() - _, err = NewServiceObservation("testdata/badobs/missing.json", *opt, metrics, reconnectCtr) + _, err = NewServiceObservationConfigFromFile("testdata/badobs/missing.json") if err.Error() != "open testdata/badobs/missing.json: no such file or directory" { t.Fatalf("observation load error: %s", err) } - _, err = NewServiceObservation("testdata/badobs/bad.json", *opt, metrics, reconnectCtr) - if err.Error() != "invalid service observation config: testdata/badobs/bad.json: name is required, topic is required, jwt or nkey credentials is required" { + _, err = NewServiceObservationConfigFromFile("testdata/badobs/bad.json") + if err.Error() != "invalid service observation config: testdata/badobs/bad.json: name is required, topic is required" { t.Fatalf("observation load error: %s", err) } - _, err = NewServiceObservation("testdata/badobs/badauth.json", *opt, metrics, reconnectCtr) - if err.Error() != "nats connection failed: nats: Authorization Violation" { - t.Fatalf("observation load error: %s", err) + config, err = NewServiceObservationConfigFromFile("testdata/badobs/badauth.json") + if err != nil { + t.Fatalf("observation config error: %s", err) + } + obs, err = newServiceObservationListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("observation listener error: %s", err) + } + err = obs.Start() + if err.Error() != "nats connection failed for id: testdata/badobs/badauth.json, service name: testing, error: nats: Authorization Violation" { + t.Fatalf("observation load error does not match expected error: %s", err) } } @@ -102,15 +89,19 @@ func TestServiceObservation_Handle(t *testing.T) { Name: prometheus.BuildFQName("nats", "survey", "nats_reconnects"), Help: "Number of times the surveyor reconnected to the NATS cluster", }, []string{"name"}) + cp := newSurveyorConnPool(opt, reconnectCtr) - obs, err := NewServiceObservation("testdata/goodobs/good.json", *opt, metrics, reconnectCtr) + config, err := NewServiceObservationConfigFromFile("testdata/goodobs/good.json") if err != nil { - t.Fatalf("observation load error: %s", err) + t.Fatalf("observation config error: %s", err) + } + obs, err := newServiceObservationListener(config, cp, opt.Logger, metrics) + if err != nil { + t.Fatalf("observation listener error: %s", err) } - err = obs.Start() if err != nil { - t.Fatalf("obs could not start: %s", err) + t.Fatalf("observation start error: %s", err) } defer obs.Stop() @@ -246,3 +237,425 @@ nats_latency_observation_status_count{service="testing",status="500"} 5 t.Fatalf("Status counter: %s", err) } } + +func TestSurveyor_ObservationsFromFile(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + opts.ObservationConfigDir = "testdata/goodobs" + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + + if ptu.ToFloat64(s.ServiceObservationManager().metrics.observationsGauge) != 1 { + t.Fatalf("process error: observations not started") + } +} + +func TestSurveyor_Observations(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + om := s.ServiceObservationManager() + + expectedObservations := make(map[string]*ServiceObsConfig) + observations := []*ServiceObsConfig{ + { + ID: "srv1", + ServiceName: "srv1", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + Nkey: "", + }, + { + ID: "srv2", + ServiceName: "srv2", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + { + ID: "srv3", + ServiceName: "srv3", + Topic: "testing.topic", + JWT: "eyJ0eXAiOiJqd3QiLCJhbGciOiJlZDI1NTE5In0.eyJqdGkiOiJFSkZHSjZPSDVFM1FXVk5GRUpRVEVLRkZXNDZFN1RISDVTQkhXSDMyQ1pIUUg1S1g3NVRRIiwiaWF0IjoxNTcwNDg3OTEzLCJpc3MiOiJBRDZEUEFUS1laTkFCNk01V0hJUTZPWFRORFRQQldMRk02TEU3TVBNTVNUVTdGSE9OTUNTUlJWVCIsIm5hbWUiOiJteXVzZXIiLCJzdWIiOiJVRDNPR1BPSVJUMjJVQUo3Nk9EUjJDRVFST0RHT1g2UVpZUldIMkw2Tk40VzRaNEhPNUpZTkNYMiIsInR5cGUiOiJ1c2VyIiwibmF0cyI6eyJwdWIiOnsiYWxsb3ciOlsiXHUwMDNlIl19LCJzdWIiOnsiYWxsb3ciOlsiXHUwMDNlIl19fX0.zmWzJIC8113VpwHjyJeg8gmOj1rceUIqvfFBlRFq62UBB08PjCe8yYjfWl-J_Enf8xGnv-ipvtEPxOxblkA8DQ", + Seed: "SUAEVSBKQ25JOR3JVVFZKQKZ7WGCYNEGYDJK7US76D2KUXNQSMK57SW2JU", + }, + } + + obsIDs := make([]string, 0) + for _, obs := range observations { + err := om.Set(obs) + if err != nil { + t.Errorf("Unexpected error on observation set: %s", err) + } + obsIDs = append(obsIDs, obs.ID) + expectedObservations[obs.ID] = obs + } + waitForObsUpdate(t, om, expectedObservations) + + setObservation := &ServiceObsConfig{ + ID: obsIDs[0], + ServiceName: "srv4", + Topic: "testing_updated.topic", + Credentials: "../test/myuser.creds", + } + expectedObservations[obsIDs[0]] = setObservation + err = om.Set(setObservation) + if err != nil { + t.Errorf("Unexpected error on observation set: %s", err) + } + waitForObsUpdate(t, om, expectedObservations) + var found bool + obsMap := om.ConfigMap() + for _, obs := range obsMap { + if obs.ServiceName == "srv4" { + found = true + break + } + } + + if !found { + t.Errorf("Expected updated service name in observations: %s", "srv4") + } + deleteID := obsIDs[0] + err = om.Delete(deleteID) + delete(expectedObservations, deleteID) + if err != nil { + t.Errorf("Unexpected error on observation delete request: %s", err) + } + waitForObsUpdate(t, om, expectedObservations) + + // observation no longer exists + err = om.Delete(deleteID) + if err == nil { + t.Error("Expected error; got nil") + } + waitForObsUpdate(t, om, expectedObservations) +} + +func TestSurveyor_ObservationsError(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + om := s.ServiceObservationManager() + if err != nil { + t.Fatalf("Error creating observations manager: %s", err) + } + + // add invalid observation (missing service name) + err = om.Set( + &ServiceObsConfig{ + ID: "id", + ServiceName: "", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + ) + + if err == nil { + t.Errorf("Expected error; got nil") + } + + // valid observation, no error + err = om.Set( + &ServiceObsConfig{ + ID: "id", + ServiceName: "srv", + Topic: "testing.topic", + Credentials: "../test/myuser.creds", + }, + ) + + if err != nil { + t.Errorf("Expected no error; got: %s", err) + } + + // update error, invalid config + err = om.Set( + &ServiceObsConfig{ + ID: "srv", + ServiceName: "srv", + Topic: "", + Credentials: "../test/myuser.creds", + }, + ) + + if err == nil { + t.Errorf("Expected error; got nil") + } +} + +func waitForObsUpdate(t *testing.T, om *ServiceObsManager, expectedObservations map[string]*ServiceObsConfig) { + t.Helper() + ticker := time.NewTicker(50 * time.Millisecond) + timeout := time.After(5 * time.Second) + defer ticker.Stop() +Outer: + for { + select { + case <-ticker.C: + observationsNum := ptu.ToFloat64(om.metrics.observationsGauge) + if observationsNum == float64(len(expectedObservations)) { + break Outer + } + case <-timeout: + observationsNum := ptu.ToFloat64(om.metrics.observationsGauge) + t.Fatalf("process error: invalid number of observations; want: %d; got: %f\n", len(expectedObservations), observationsNum) + return + } + } + + existingObservations := om.ConfigMap() + if len(existingObservations) != len(expectedObservations) { + t.Fatalf("Unexpected number of observations; want: %d; got: %d", len(expectedObservations), len(existingObservations)) + } + for _, existingObservation := range existingObservations { + obs, ok := expectedObservations[existingObservation.ID] + if !ok { + t.Fatalf("Missing observation with ID: %s", existingObservation.ID) + } + if !reflect.DeepEqual(obs, existingObservation) { + t.Fatalf("Invalid observation config; want: %+v; got: %+v", obs, existingObservation) + } + } +} + +func TestSurveyor_ObservationsWatcher(t *testing.T) { + sc := st.NewSuperCluster(t) + defer sc.Shutdown() + + opts := getTestOptions() + + dirName := fmt.Sprintf("testdata/obs%d", time.Now().UnixNano()) + if err := os.Mkdir(dirName, 0o700); err != nil { + t.Fatalf("Error creating observations dir: %s", err) + } + defer os.RemoveAll(dirName) + opts.ObservationConfigDir = dirName + + s, err := NewSurveyor(opts) + if err != nil { + t.Fatalf("couldn't create surveyor: %v", err) + } + if err = s.Start(); err != nil { + t.Fatalf("start error: %v", err) + } + defer s.Stop() + time.Sleep(200 * time.Millisecond) + + om := s.ServiceObservationManager() + expectedObservations := make(map[string]*ServiceObsConfig) + + t.Run("write observation file - create operation", func(t *testing.T) { + obsConfig := &ServiceObsConfig{ + ServiceName: "testing1", + Topic: "testing1.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + }) + + t.Run("first create then write to file - write operation", func(t *testing.T) { + obsConfig := &ServiceObsConfig{ + ServiceName: "testing2", + Topic: "testing2.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath := fmt.Sprintf("%s/write.json", dirName) + f, err := os.Create(obsPath) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + if err := f.Close(); err != nil { + t.Fatalf("Error closing file: %s", err) + } + time.Sleep(200 * time.Millisecond) + if err := os.WriteFile(obsPath, obsConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + }) + + t.Run("create observations in subfolder", func(t *testing.T) { + obsConfig := &ServiceObsConfig{ + ServiceName: "testing3", + Topic: "testing3.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + if err := os.Mkdir(fmt.Sprintf("%s/subdir", dirName), 0o700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + obsPath := fmt.Sprintf("%s/subdir/subobs.json", dirName) + + err = os.WriteFile(obsPath, obsConfigJSON, 0o600) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + + obsConfig = &ServiceObsConfig{ + ServiceName: "testing4", + Topic: "testing4.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err = json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + obsPath = fmt.Sprintf("%s/subdir/abc.json", dirName) + + if err := os.WriteFile(obsPath, obsConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + + obsConfig = &ServiceObsConfig{ + ServiceName: "testing5", + Topic: "testing5.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err = json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + if err := os.Mkdir(fmt.Sprintf("%s/subdir/nested", dirName), 0o700); err != nil { + t.Fatalf("Error creating subdirectory: %s", err) + } + time.Sleep(100 * time.Millisecond) + + obsPath = fmt.Sprintf("%s/subdir/nested/nested.json", dirName) + err = os.WriteFile(obsPath, obsConfigJSON, 0o600) + if err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + }) + + t.Run("update observations", func(t *testing.T) { + obsConfig := &ServiceObsConfig{ + ServiceName: "testing_updated", + Topic: "testing_updated.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + obsPath := fmt.Sprintf("%s/write.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + + // update file with invalid JSON - existing observation should not be impacted + if err := os.WriteFile(obsPath, []byte("abc"), 0o600); err != nil { + t.Fatalf("Error writing to file: %s", err) + } + time.Sleep(100 * time.Millisecond) + waitForObsUpdate(t, om, expectedObservations) + }) + + t.Run("remove observations", func(t *testing.T) { + // remove single observation + obsPath := fmt.Sprintf("%s/create.json", dirName) + if err := os.Remove(obsPath); err != nil { + t.Fatalf("Error removing observation config: %s", err) + } + delete(expectedObservations, obsPath) + waitForObsUpdate(t, om, expectedObservations) + + // remove whole subfolder + if err := os.RemoveAll(fmt.Sprintf("%s/subdir", dirName)); err != nil { + t.Fatalf("Error removing subdirectory: %s", err) + } + + delete(expectedObservations, fmt.Sprintf("%s/subdir/subobs.json", dirName)) + delete(expectedObservations, fmt.Sprintf("%s/subdir/abc.json", dirName)) + delete(expectedObservations, fmt.Sprintf("%s/subdir/nested/nested.json", dirName)) + waitForObsUpdate(t, om, expectedObservations) + + obsConfig := &ServiceObsConfig{ + ServiceName: "testing10", + Topic: "testing1.topic", + Credentials: "../test/myuser.creds", + } + obsConfigJSON, err := json.Marshal(obsConfig) + if err != nil { + t.Fatalf("marshalling error: %s", err) + } + + obsPath = fmt.Sprintf("%s/another.json", dirName) + if err := os.WriteFile(obsPath, obsConfigJSON, 0o600); err != nil { + t.Fatalf("Error writing observation config file: %s", err) + } + + obsConfig.ID = obsPath + expectedObservations[obsPath] = obsConfig + waitForObsUpdate(t, om, expectedObservations) + }) +} diff --git a/surveyor/surveyor.go b/surveyor/surveyor.go index a7805c2..e4d5d62 100644 --- a/surveyor/surveyor.go +++ b/surveyor/surveyor.go @@ -70,7 +70,6 @@ type Options struct { HTTPCertFile string HTTPKeyFile string HTTPCaFile string - NATSServerURL string HTTPUser string // User in metrics scrape by Prometheus. HTTPPassword string Prefix string // TODO @@ -105,74 +104,18 @@ func GetDefaultOptions() *Options { // A Surveyor instance type Surveyor struct { sync.Mutex - opts Options - logger *logrus.Logger - listener net.Listener + cp *natsConnPool httpServer *http.Server + jsAdvisoryManager *JSAdvisoryManager + jsAdvisoryFSWatcher *jsAdvisoryFSWatcher + listener net.Listener + logger *logrus.Logger + opts Options promRegistry *prometheus.Registry - reconnectCtr *prometheus.CounterVec statzC *StatzCollector serviceObsManager *ServiceObsManager - serviceObsFsWatcher *serviceObsFSWatcher - jsAPIMetrics *JSAdvisoryMetrics - jsAPIAudits []*JSAdvisoryListener - stop chan struct{} -} - -func connect(opts *Options, reconnectCtr *prometheus.CounterVec) (*nats.Conn, error) { - nopts := []nats.Option{ - nats.Name(opts.Name), - } - - switch { - case opts.Credentials != "": - nopts = append(nopts, nats.UserCredentials(opts.Credentials)) - case opts.Nkey != "": - o, err := nats.NkeyOptionFromSeed(opts.Nkey) - if err != nil { - return nil, fmt.Errorf("unable to load nkey: %v", err) - } - nopts = append(nopts, o) - case opts.NATSUser != "": - nopts = append(nopts, nats.UserInfo(opts.NATSUser, opts.NATSPassword)) - } - - nopts = append(nopts, nats.DisconnectErrHandler(func(c *nats.Conn, err error) { - if err != nil { - opts.Logger.Warnf("%q disconnected, will possibly miss replies: %v", c.Opts.Name, err) - } - })) - nopts = append(nopts, nats.ReconnectHandler(func(c *nats.Conn) { - reconnectCtr.WithLabelValues(c.Opts.Name).Inc() - opts.Logger.Infof("%q reconnected to %v", c.Opts.Name, c.ConnectedAddr()) - })) - nopts = append(nopts, nats.ClosedHandler(func(c *nats.Conn) { - opts.Logger.Infof("%q connection closing", c.Opts.Name) - })) - nopts = append(nopts, nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if s != nil { - opts.Logger.Warnf("Error: name=%q err=%v", c.Opts.Name, err) - } else { - opts.Logger.Warnf("Error: name=%q, subject=%s, err=%v", c.Opts.Name, s.Subject, err) - } - })) - nopts = append(nopts, nats.MaxReconnects(10240)) - - // NATS TLS Options - if opts.CaFile != "" { - nopts = append(nopts, nats.RootCAs(opts.CaFile)) - } - if opts.CertFile != "" { - nopts = append(nopts, nats.ClientCert(opts.CertFile, opts.KeyFile)) - } - - nc, err := nats.Connect(opts.URLs, nopts...) - if err != nil { - return nil, err - } - opts.Logger.Infof("%s connected to NATS Deployment: %v", opts.Name, nc.ConnectedAddr()) - - return nc, err + serviceObsFSWatcher *serviceObsFSWatcher + sysAcctPC *pooledNatsConn } // NewSurveyor creates a surveyor @@ -188,37 +131,69 @@ func NewSurveyor(opts *Options) (*Surveyor, error) { ConstLabels: opts.ConstLabels, }, []string{"name"}) promRegistry.MustRegister(reconnectCtr) + cp := newSurveyorConnPool(opts, reconnectCtr) serviceObsMetrics := NewServiceObservationMetrics(promRegistry, opts.ConstLabels) - serviceObsManager := newServiceObservationManager(opts.Logger, *opts, serviceObsMetrics, reconnectCtr) - serviceObsFsWatcher := newServiceObservationFSWatcher(opts.Logger, serviceObsManager) - jsAPIMetrics := NewJetStreamAdvisoryMetrics(promRegistry, opts.ConstLabels) + serviceObsManager := newServiceObservationManager(cp, opts.Logger, serviceObsMetrics) + serviceFsWatcher := newServiceObservationFSWatcher(opts.Logger, serviceObsManager) + jsAdvisoryMetrics := NewJetStreamAdvisoryMetrics(promRegistry, opts.ConstLabels) + jsAdvisoryManager := newJetStreamAdvisoryManager(cp, opts.Logger, jsAdvisoryMetrics) + jsFsWatcher := newJetStreamAdvisoryFSWatcher(opts.Logger, jsAdvisoryManager) return &Surveyor{ - opts: *opts, + cp: cp, + jsAdvisoryManager: jsAdvisoryManager, + jsAdvisoryFSWatcher: jsFsWatcher, logger: opts.Logger, + opts: *opts, promRegistry: promRegistry, - reconnectCtr: reconnectCtr, serviceObsManager: serviceObsManager, - serviceObsFsWatcher: serviceObsFsWatcher, - jsAPIMetrics: jsAPIMetrics, + serviceObsFSWatcher: serviceFsWatcher, }, nil } +func newSurveyorConnPool(opts *Options, reconnectCtr *prometheus.CounterVec) *natsConnPool { + natsDefaults := &natsContextDefaults{ + Name: opts.Name, + URL: opts.URLs, + TLSCert: opts.CertFile, + TLSKey: opts.KeyFile, + TLSCA: opts.CaFile, + } + natsOpts := []nats.Option{ + nats.DisconnectErrHandler(func(c *nats.Conn, err error) { + if err != nil { + opts.Logger.Warnf("%q disconnected, will possibly miss replies: %v", c.Opts.Name, err) + } + }), + nats.ReconnectHandler(func(c *nats.Conn) { + reconnectCtr.WithLabelValues(c.Opts.Name).Inc() + opts.Logger.Infof("%q reconnected to %v", c.Opts.Name, c.ConnectedAddr()) + }), + nats.ClosedHandler(func(c *nats.Conn) { + opts.Logger.Infof("%q connection closing", c.Opts.Name) + }), + nats.ErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { + if s != nil { + opts.Logger.Warnf("Error: name=%q, subject=%s, err=%v", c.Opts.Name, s.Subject, err) + } else { + opts.Logger.Warnf("Error: name=%q err=%v", c.Opts.Name, err) + } + }), + nats.MaxReconnects(10240), + } + return newNatsConnPool(opts.Logger, natsDefaults, natsOpts) +} + func (s *Surveyor) createStatszCollector() error { if s.opts.ExpectedServers == 0 { return nil } - nc, err := connect(&s.opts, s.reconnectCtr) - if err != nil { - return err - } - if !s.opts.Accounts { s.logger.Debugln("Skipping per-account exports") } - s.statzC = NewStatzCollector(nc, s.logger, s.opts.ExpectedServers, s.opts.ServerResponseWait, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels) + s.statzC = NewStatzCollector(s.sysAcctPC.nc, s.logger, s.opts.ExpectedServers, s.opts.ServerResponseWait, s.opts.PollTimeout, s.opts.Accounts, s.opts.ConstLabels) s.promRegistry.MustRegister(s.statzC) return nil } @@ -391,54 +366,41 @@ func (s *Surveyor) startHTTP() error { return nil } -func (s *Surveyor) startJetStreamAdvisories() error { - s.jsAPIAudits = []*JSAdvisoryListener{} - s.jsAPIMetrics.jsAdvisoriesGauge.Set(0) +func (s *Surveyor) startJetStreamAdvisories() { + if s.jsAdvisoryManager.IsRunning() { + return + } + s.jsAdvisoryManager.start() dir := s.opts.JetStreamConfigDir if dir == "" { - s.logger.Debugln("Skipping JetStream API Audit startup, no directory configured") - return nil + s.logger.Debugln("skipping JetStream advisory startup, no directory configured") + return } fs, err := os.Stat(dir) if err != nil { - return fmt.Errorf("could not start JetStream API Audit, %s does not exist", dir) + s.logger.Warnf("could not start JetStream advisories, dir %s does not exist", dir) + return } if !fs.IsDir() { - return fmt.Errorf("JetStream API Audit dir %s is not a directory", dir) + s.logger.Warnf("JetStream advisories dir %s is not a directory", dir) + return } - // TODO: new watcher should be created in each directory - err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if filepath.Ext(info.Name()) != ".json" { - return nil - } - - obs, err := NewJetStreamAdvisoryListener(path, s.opts, s.jsAPIMetrics, s.reconnectCtr) - if err != nil { - return fmt.Errorf("could not create JetStream API Audit from %s: %s", path, err) - } - - err = obs.Start() - if err != nil { - return fmt.Errorf("could not start observation from %s: %s", path, err) - } - - s.jsAPIAudits = append(s.jsAPIAudits, obs) - - return nil - }) + if err := s.jsAdvisoryFSWatcher.start(); err != nil { + s.logger.Warnf("could not start JetStream advisories filesystem watcher: %s", err) + } - return err + err = filepath.WalkDir(dir, s.jsAdvisoryFSWatcher.startAdvisoriesInDir()) + if err != nil { + s.logger.Warnf("error traversing JetStream advisories dir: %s, error: %s", dir, err) + return + } } -func (s *Surveyor) startObservations() { +func (s *Surveyor) startServiceObservations() { if s.serviceObsManager.IsRunning() { return } @@ -446,28 +408,28 @@ func (s *Surveyor) startObservations() { s.serviceObsManager.start() dir := s.opts.ObservationConfigDir if dir == "" { - s.logger.Debugln("Skipping observation startup, no directory configured") + s.logger.Debugln("skipping service observation startup, no directory configured") return } fs, err := os.Stat(dir) if err != nil { - s.logger.Warnf("could not start observations, %s does not exist", dir) + s.logger.Warnf("could not start service observations, dir %s does not exist", dir) return } if !fs.IsDir() { - s.logger.Warnf("observations dir %s is not a directory", dir) + s.logger.Warnf("service observations dir %s is not a directory", dir) return } - if err := s.serviceObsFsWatcher.start(); err != nil { - s.logger.Warnf("could not start filesystem watcher: %s", err) + if err := s.serviceObsFSWatcher.start(); err != nil { + s.logger.Warnf("could not start service observations filesystem watcher: %s", err) } - err = filepath.WalkDir(dir, s.serviceObsFsWatcher.startObservationsInDir()) + err = filepath.WalkDir(dir, s.serviceObsFSWatcher.startObservationsInDir()) if err != nil { - s.logger.Warnf("error traversing observations dir: %s: %s", dir, err) + s.logger.Warnf("error traversing service observations dir: %s, error: %s", dir, err) return } } @@ -476,12 +438,31 @@ func (s *Surveyor) ServiceObservationManager() *ServiceObsManager { return s.serviceObsManager } +func (s *Surveyor) JetStreamAdvisoryManager() *JSAdvisoryManager { + return s.jsAdvisoryManager +} + // Start starts the surveyor func (s *Surveyor) Start() error { s.Lock() defer s.Unlock() + if s.sysAcctPC != nil { + // already running + return nil + } - s.stop = make(chan struct{}) + var err error + natsCtx := &natsContext{ + Credentials: s.opts.Credentials, + Nkey: s.opts.Nkey, + Username: s.opts.NATSUser, + Password: s.opts.NATSPassword, + } + + s.sysAcctPC, err = s.cp.Get(natsCtx) + if err != nil { + return err + } if s.statzC == nil { if err := s.createStatszCollector(); err != nil { @@ -489,13 +470,8 @@ func (s *Surveyor) Start() error { } } - s.startObservations() - - if s.jsAPIAudits == nil { - if err := s.startJetStreamAdvisories(); err != nil { - return err - } - } + s.startServiceObservations() + s.startJetStreamAdvisories() if !s.opts.DisableHTTPServer && s.listener == nil && s.httpServer == nil { if err := s.startHTTP(); err != nil { @@ -510,6 +486,10 @@ func (s *Surveyor) Start() error { func (s *Surveyor) Stop() { s.Lock() defer s.Unlock() + if s.sysAcctPC == nil { + // already stopped + return + } if s.httpServer != nil { _ = s.httpServer.Shutdown(context.Background()) @@ -523,20 +503,17 @@ func (s *Surveyor) Stop() { if s.statzC != nil { s.promRegistry.Unregister(s.statzC) - s.statzC.nc.Close() s.statzC = nil } - s.serviceObsFsWatcher.stop() + s.serviceObsFSWatcher.stop() s.serviceObsManager.stop() - if s.jsAPIAudits != nil { - for _, j := range s.jsAPIAudits { - j.nc.Close() - } - s.jsAPIAudits = nil - } - close(s.stop) + s.jsAdvisoryFSWatcher.stop() + s.jsAdvisoryManager.stop() + + s.sysAcctPC.ReturnToPool() + s.sysAcctPC = nil } // Gather implements the prometheus.Gatherer interface diff --git a/surveyor/surveyor_test.go b/surveyor/surveyor_test.go index 9e2c0da..7bb1a19 100644 --- a/surveyor/surveyor_test.go +++ b/surveyor/surveyor_test.go @@ -16,18 +16,17 @@ package surveyor import ( "crypto/tls" "crypto/x509" - "encoding/json" "fmt" "io" "net/http" "os" - "reflect" "strings" "testing" "time" + "github.com/nats-io/nats.go" + st "github.com/nats-io/nats-surveyor/test" - ptu "github.com/prometheus/client_golang/prometheus/testutil" ) // Testing constants @@ -306,7 +305,7 @@ func TestSurveyor_Reconnect(t *testing.T) { func TestSurveyor_ClientTLSFail(t *testing.T) { ns := st.StartServer(t, "../test/r1s1.conf") - st.ConnectAndVerify(t, ns.ClientURL()) + st.ConnectAndVerify(t, ns.ClientURL(), nats.UserCredentials("../test/myuser.creds")) defer ns.Shutdown() opts := getTestOptions() @@ -510,427 +509,6 @@ func TestSurveyor_MissingResponses(t *testing.T) { }) } -func TestSurveyor_ObservationsFromFile(t *testing.T) { - sc := st.NewSuperCluster(t) - defer sc.Shutdown() - - opts := getTestOptions() - opts.ObservationConfigDir = "testdata/goodobs" - - s, err := NewSurveyor(opts) - if err != nil { - t.Fatalf("couldn't create surveyor: %v", err) - } - if err = s.Start(); err != nil { - t.Fatalf("start error: %v", err) - } - defer s.Stop() - - if ptu.ToFloat64(s.ServiceObservationManager().metrics.observationsGauge) != 1 { - t.Fatalf("process error: observations not started") - } -} - -func TestSurveyor_Observations(t *testing.T) { - sc := st.NewSuperCluster(t) - defer sc.Shutdown() - - opts := getTestOptions() - - s, err := NewSurveyor(opts) - if err != nil { - t.Fatalf("couldn't create surveyor: %v", err) - } - if err = s.Start(); err != nil { - t.Fatalf("start error: %v", err) - } - defer s.Stop() - om := s.ServiceObservationManager() - - expectedObservations := make(map[string]*ServiceObsConfig) - observations := []*ServiceObsConfig{ - { - ID: "srv1", - ServiceName: "srv1", - Topic: "testing.topic", - Credentials: "../test/myuser.creds", - Nkey: "", - }, - { - ID: "srv2", - ServiceName: "srv2", - Topic: "testing.topic", - Credentials: "../test/myuser.creds", - }, - { - ID: "srv3", - ServiceName: "srv3", - Topic: "testing.topic", - Credentials: "../test/myuser.creds", - }, - } - - obsIDs := make([]string, 0) - for _, obs := range observations { - err := om.Set(obs) - if err != nil { - t.Errorf("Unexpected error on observation set: %s", err) - } - obsIDs = append(obsIDs, obs.ID) - expectedObservations[obs.ID] = obs - } - waitForMetricUpdate(t, om, expectedObservations) - - setObservation := &ServiceObsConfig{ - ID: obsIDs[0], - ServiceName: "srv4", - Topic: "testing_updated.topic", - Credentials: "../test/myuser.creds", - } - expectedObservations[obsIDs[0]] = setObservation - err = om.Set(setObservation) - if err != nil { - t.Errorf("Unexpected error on observation set: %s", err) - } - waitForMetricUpdate(t, om, expectedObservations) - var found bool - obsMap := om.ConfigMap() - for _, obs := range obsMap { - if obs.ServiceName == "srv4" { - found = true - break - } - } - - if !found { - t.Errorf("Expected updated service name in observations: %s", "srv4") - } - deleteID := obsIDs[0] - err = om.Delete(deleteID) - delete(expectedObservations, deleteID) - if err != nil { - t.Errorf("Unexpected error on observation delete request: %s", err) - } - waitForMetricUpdate(t, om, expectedObservations) - - // observation no longer exists - err = om.Delete(deleteID) - if err == nil { - t.Error("Expected error; got nil") - } - waitForMetricUpdate(t, om, expectedObservations) -} - -func TestSurveyor_ObservationsError(t *testing.T) { - sc := st.NewSuperCluster(t) - defer sc.Shutdown() - - opts := getTestOptions() - - s, err := NewSurveyor(opts) - if err != nil { - t.Fatalf("couldn't create surveyor: %v", err) - } - if err = s.Start(); err != nil { - t.Fatalf("start error: %v", err) - } - defer s.Stop() - om := s.ServiceObservationManager() - if err != nil { - t.Fatalf("Error creating observations manager: %s", err) - } - - // add invalid observation (missing service name) - err = om.Set( - &ServiceObsConfig{ - ID: "id", - ServiceName: "", - Topic: "testing.topic", - Credentials: "../test/myuser.creds", - }, - ) - - if err == nil { - t.Errorf("Expected error; got nil") - } - - // valid observation, no error - err = om.Set( - &ServiceObsConfig{ - ID: "id", - ServiceName: "srv", - Topic: "testing.topic", - Credentials: "../test/myuser.creds", - }, - ) - - if err != nil { - t.Errorf("Expected no error; got: %s", err) - } - - // update error, invalid config - err = om.Set( - &ServiceObsConfig{ - ID: "srv", - ServiceName: "srv", - Topic: "", - Credentials: "../test/myuser.creds", - }, - ) - - if err == nil { - t.Errorf("Expected error; got nil") - } -} - -func waitForMetricUpdate(t *testing.T, om *ServiceObsManager, expectedObservations map[string]*ServiceObsConfig) { - t.Helper() - ticker := time.NewTicker(50 * time.Millisecond) - timeout := time.After(5 * time.Second) - defer ticker.Stop() -Outer: - for { - select { - case <-ticker.C: - observationsNum := ptu.ToFloat64(om.metrics.observationsGauge) - if observationsNum == float64(len(expectedObservations)) { - break Outer - } - case <-timeout: - observationsNum := ptu.ToFloat64(om.metrics.observationsGauge) - t.Fatalf("process error: invalid number of observations; want: %d; got: %f\n", len(expectedObservations), observationsNum) - return - } - } - - existingObservations := om.ConfigMap() - if len(existingObservations) != len(expectedObservations) { - t.Fatalf("Unexpected number of observations; want: %d; got: %d", len(expectedObservations), len(existingObservations)) - } - for _, existingObservation := range existingObservations { - obs, ok := expectedObservations[existingObservation.ID] - if !ok { - t.Fatalf("Missing observation with ID: %s", existingObservation.ID) - } - if !reflect.DeepEqual(obs, existingObservation) { - t.Fatalf("Invalid observation config; want: %+v; got: %+v", obs, existingObservation) - } - } -} - -func TestSurveyor_ObservationsWatcher(t *testing.T) { - sc := st.NewSuperCluster(t) - defer sc.Shutdown() - - opts := getTestOptions() - - dirName := fmt.Sprintf("testdata/obs%d", time.Now().UnixNano()) - if err := os.Mkdir(dirName, 0700); err != nil { - t.Fatalf("Error creating observations dir: %s", err) - } - defer os.RemoveAll(dirName) - opts.ObservationConfigDir = dirName - - s, err := NewSurveyor(opts) - if err != nil { - t.Fatalf("couldn't create surveyor: %v", err) - } - if err = s.Start(); err != nil { - t.Fatalf("start error: %v", err) - } - defer s.Stop() - time.Sleep(200 * time.Millisecond) - - om := s.ServiceObservationManager() - expectedObservations := make(map[string]*ServiceObsConfig) - - t.Run("write observation file - create operation", func(t *testing.T) { - obsConfig := &ServiceObsConfig{ - ServiceName: "testing1", - Topic: "testing1.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err := json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - obsPath := fmt.Sprintf("%s/create.json", dirName) - if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - }) - - t.Run("first create then write to file - write operation", func(t *testing.T) { - obsConfig := &ServiceObsConfig{ - ServiceName: "testing2", - Topic: "testing2.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err := json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - obsPath := fmt.Sprintf("%s/write.json", dirName) - f, err := os.Create(obsPath) - if err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - if err := f.Close(); err != nil { - t.Fatalf("Error closing file: %s", err) - } - time.Sleep(200 * time.Millisecond) - if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { - t.Fatalf("Error writing to file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - }) - - t.Run("create observations in subfolder", func(t *testing.T) { - obsConfig := &ServiceObsConfig{ - ServiceName: "testing3", - Topic: "testing3.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err := json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - - if err := os.Mkdir(fmt.Sprintf("%s/subdir", dirName), 0700); err != nil { - t.Fatalf("Error creating subdirectory: %s", err) - } - time.Sleep(100 * time.Millisecond) - - obsPath := fmt.Sprintf("%s/subdir/subobs.json", dirName) - - err = os.WriteFile(obsPath, obsConfigJSON, 0600) - if err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - - obsConfig = &ServiceObsConfig{ - ServiceName: "testing4", - Topic: "testing4.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err = json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - obsPath = fmt.Sprintf("%s/subdir/abc.json", dirName) - - if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - - obsConfig = &ServiceObsConfig{ - ServiceName: "testing5", - Topic: "testing5.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err = json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - if err := os.Mkdir(fmt.Sprintf("%s/subdir/nested", dirName), 0700); err != nil { - t.Fatalf("Error creating subdirectory: %s", err) - } - time.Sleep(100 * time.Millisecond) - - obsPath = fmt.Sprintf("%s/subdir/nested/nested.json", dirName) - err = os.WriteFile(obsPath, obsConfigJSON, 0600) - if err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - }) - - t.Run("update observations", func(t *testing.T) { - obsConfig := &ServiceObsConfig{ - ServiceName: "testing_updated", - Topic: "testing_updated.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err := json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - - obsPath := fmt.Sprintf("%s/write.json", dirName) - if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { - t.Fatalf("Error writing to file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - - // update file with invalid JSON - existing observation should not be impacted - if err := os.WriteFile(obsPath, []byte("abc"), 0600); err != nil { - t.Fatalf("Error writing to file: %s", err) - } - time.Sleep(100 * time.Millisecond) - waitForMetricUpdate(t, om, expectedObservations) - }) - - t.Run("remove observations", func(t *testing.T) { - // remove single observation - obsPath := fmt.Sprintf("%s/create.json", dirName) - if err := os.Remove(obsPath); err != nil { - t.Fatalf("Error removing observation config: %s", err) - } - delete(expectedObservations, obsPath) - waitForMetricUpdate(t, om, expectedObservations) - - // remove whole subfolder - if err := os.RemoveAll(fmt.Sprintf("%s/subdir", dirName)); err != nil { - t.Fatalf("Error removing subdirectory: %s", err) - } - - delete(expectedObservations, fmt.Sprintf("%s/subdir/subobs.json", dirName)) - delete(expectedObservations, fmt.Sprintf("%s/subdir/abc.json", dirName)) - delete(expectedObservations, fmt.Sprintf("%s/subdir/nested/nested.json", dirName)) - waitForMetricUpdate(t, om, expectedObservations) - - obsConfig := &ServiceObsConfig{ - ServiceName: "testing10", - Topic: "testing1.topic", - Credentials: "../test/myuser.creds", - } - obsConfigJSON, err := json.Marshal(obsConfig) - if err != nil { - t.Fatalf("marshalling error: %s", err) - } - - obsPath = fmt.Sprintf("%s/another.json", dirName) - if err := os.WriteFile(obsPath, obsConfigJSON, 0600); err != nil { - t.Fatalf("Error writing observation config file: %s", err) - } - - obsConfig.ID = obsPath - expectedObservations[obsPath] = obsConfig - waitForMetricUpdate(t, om, expectedObservations) - }) -} - func TestSurveyor_ConcurrentBlock(t *testing.T) { sc := st.NewSuperCluster(t) defer sc.Shutdown() diff --git a/surveyor/testdata/badjs/badauth.json b/surveyor/testdata/badjs/badauth.json new file mode 100644 index 0000000..db78093 --- /dev/null +++ b/surveyor/testdata/badjs/badauth.json @@ -0,0 +1,5 @@ +{ + "name": "testing", + "username": "z", + "password": "z" +} \ No newline at end of file diff --git a/surveyor/testdata/goodjs/global.json b/surveyor/testdata/goodjs/global.json index d0a6063..dbe6d3e 100644 --- a/surveyor/testdata/goodjs/global.json +++ b/surveyor/testdata/goodjs/global.json @@ -1,3 +1,3 @@ { - "name": "global" + "name": "global" } diff --git a/test/c.nkey b/test/c.nkey new file mode 100644 index 0000000..7d59933 --- /dev/null +++ b/test/c.nkey @@ -0,0 +1 @@ +SUAO2FIGX2CPBN27GU4M23EFA3STF3LB6JLAFYOFGJED35BZXR336LX5EY \ No newline at end of file diff --git a/test/jetstream.conf b/test/jetstream.conf index 54cc5fb..0930287 100644 --- a/test/jetstream.conf +++ b/test/jetstream.conf @@ -3,3 +3,40 @@ jetstream {} server_name: jetstream listen: 0.0.0.0:-1 + +system_account: sys +no_auth_user: global + +accounts { + a: { + jetstream: enabled, + users: [ + {user: a, password: a} + ] + } + b: { + jetstream: enabled, + users: [ + { user: b, password: b } + ] + } + c: { + jetstream: enabled, + users: [ + { nkey: UCUCQH5GPMLKUB4NCR5EIAGJ5E5RGB57VRO5CT5XAEQTID75ERQRVUQV } + ] + } + d: { + jetstream: enabled, + users: [ + { user: d, password: d } + ] + } + global: { + jetstream: enabled, + users: [ + {user: global, password: global} + ] + } + sys: {} +} diff --git a/test/test.go b/test/test.go index d6a2b53..2c8c6e2 100644 --- a/test/test.go +++ b/test/test.go @@ -23,7 +23,7 @@ import ( "github.com/nats-io/nats-server/v2/logger" ns "github.com/nats-io/nats-server/v2/server" - nats "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go" ) // Test variables @@ -103,7 +103,6 @@ func StartBasicServer() *ns.Server { func StartServer(t *testing.T, confFile string) *ns.Server { resetPreviousHTTPConnections() opts, err := ns.ProcessConfigFile(confFile) - if err != nil { t.Fatalf("Error processing config file: %v", err) } @@ -151,7 +150,7 @@ func NewSuperCluster(t *testing.T) *SuperCluster { // NewSingleServer creates a single NATS server with a system account func NewSingleServer(t *testing.T) *ns.Server { s := StartServer(t, "../test/r1s1.conf") - ConnectAndVerify(t, s.ClientURL()) + ConnectAndVerify(t, s.ClientURL(), nats.UserCredentials("../test/myuser.creds")) return s } @@ -174,8 +173,8 @@ func (sc *SuperCluster) Shutdown() { // ConnectAndVerify connects to a server a verifies it is // ready to process messages. -func ConnectAndVerify(t *testing.T, url string) *nats.Conn { - c, err := nats.Connect(url, nats.UserCredentials("../test/myuser.creds")) +func ConnectAndVerify(t *testing.T, url string, options ...nats.Option) *nats.Conn { + c, err := nats.Connect(url, options...) if err != nil { t.Fatalf("Couldn't connect a client to %s: %v", url, err) } @@ -198,7 +197,7 @@ func ConnectAndVerify(t *testing.T, url string) *nats.Conn { func (sc *SuperCluster) setupClientsAndVerify(t *testing.T) { for _, s := range sc.Servers { - c := ConnectAndVerify(t, s.ClientURL()) + c := ConnectAndVerify(t, s.ClientURL(), nats.UserCredentials("../test/myuser.creds")) sc.Clients = append(sc.Clients, c) } diff --git a/test/test_test.go b/test/test_test.go index ab36f32..1f06436 100644 --- a/test/test_test.go +++ b/test/test_test.go @@ -16,6 +16,8 @@ package test import ( "testing" + + "github.com/nats-io/nats.go" ) // Keep our coverage up @@ -37,6 +39,11 @@ func TestStartSingleServer(t *testing.T) { func TestStartServers(t *testing.T) { ns := StartServer(t, "../test/r1s1.conf") - ConnectAndVerify(t, ns.ClientURL()) + ConnectAndVerify(t, ns.ClientURL(), nats.UserCredentials("../test/myuser.creds")) + ns.Shutdown() +} + +func TestStartJetStreamServer(t *testing.T) { + ns := NewJetStreamServer(t) ns.Shutdown() }