-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathpubsub.go
82 lines (68 loc) · 2.43 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//Package pubsub implements publish subscriber patterns for usage in Golang
//go:generate mockgen -source pubsub.go -destination pubsub_mock.go -package pubsub
package pubsub
import (
"context"
"sync"
"time"
)
var (
clients = []*Client{&Client{Provider: NoopProvider{}}}
publishWaitGroup sync.WaitGroup
)
// Client holds a reference to a Provider
type Client struct {
ServiceName string
Provider Provider
Middleware []Middleware
}
// SetClient sets the global pubsub client, useful in tests
func SetClient(cli *Client) {
clients = []*Client{cli}
}
// GetClient get the global pubsub client, useful in tests
func GetClient() *Client {
return clients[0]
}
// AddPublisherClient allows another client to bet set, for publishing only
func AddPublisherClient(cli *Client) {
clients = append(clients, cli)
}
// Provider is generic interface for a pub sub provider
type Provider interface {
Publish(ctx context.Context, topic string, m *Msg) error
Subscribe(opts HandlerOptions, handler MsgHandler)
Shutdown()
}
// Subscriber is a service that listens to events and registers handlers
// for those events
type Subscriber interface {
// Setup is a required method that allows the subscriber service to add handlers
// and perform any setup if required, this is usually called by pubsub upon start
Setup(*Client)
}
// Msg is a lile representation of a pub sub message
type Msg struct {
ID string
Metadata map[string]string
Data []byte
PublishTime *time.Time
Ack func()
Nack func()
}
// Handler is a specific callback used for Subscribe in the format of..
// func(ctx context.Context, obj proto.Message, msg *Msg) error
// for example, you can unmarshal a custom type..
// func(ctx context.Context, accounts accounts.Account, msg *Msg) error
// you can also unmarshal a JSON object by supplying any type of interface{}
// func(ctx context.Context, accounts models.SomeJSONAccount, msg *Msg) error
type Handler interface{}
// MsgHandler is the internal or raw message handler
type MsgHandler func(ctx context.Context, m Msg) error
// PublishHandler wraps a call to publish, for interception
type PublishHandler func(ctx context.Context, topic string, m *Msg) error
// Middleware is an interface to provide subscriber and publisher interceptors
type Middleware interface {
SubscribeInterceptor(opts HandlerOptions, next MsgHandler) MsgHandler
PublisherMsgInterceptor(serviceName string, next PublishHandler) PublishHandler
}