-
Notifications
You must be signed in to change notification settings - Fork 3
/
funcs.go
316 lines (277 loc) · 10 KB
/
funcs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
package inngestgo
import (
"context"
"reflect"
"time"
"github.com/gosimple/slug"
"github.com/inngest/inngest/pkg/inngest"
)
// Ptr converts the given type to a pointer. Nil pointers are sometimes
// used for optional arguments within configuration, meaning we need pointers
// within struct values. This util helps.
func Ptr[T any](i T) *T { return &i }
func BoolPtr(b bool) *bool { return &b }
func StrPtr(i string) *string { return &i }
func IntPtr(i int) *int { return &i }
type FunctionOpts struct {
// ID is an optional function ID. If not specified, the ID
// will be auto-generated by lowercasing and slugging the name.
ID string
// Name represents a human-readable function name.
Name string
Priority *inngest.Priority
Concurrency []inngest.Concurrency
Idempotency *string
Retries *int
Cancel []inngest.Cancel
Debounce *Debounce
// Timeouts represents timeouts for a function.
Timeouts *Timeouts
// Throttle represents a soft rate limit for gating function starts. Any function runs
// over the throttle period will be enqueued in the backlog to run at the next available
// time.
Throttle *Throttle
// RateLimit allows specifying custom rate limiting for the function. A RateLimit is
// hard rate limiting: any function invocations over the rate limit will be ignored and
// will never run.
RateLimit *RateLimit
// BatchEvents represents batching
BatchEvents *inngest.EventBatchConfig
}
// GetRateLimit returns the inngest.RateLimit for function configuration. The
// SDK's RateLimit type is incompatible with the inngest.RateLimit type signature
// for ease of definition.
func (f FunctionOpts) GetRateLimit() *inngest.RateLimit {
if f.RateLimit == nil {
return nil
}
return f.RateLimit.Convert()
}
// GetTimeouts returns the inngest.Timeouts in a compatible type signature.
func (f FunctionOpts) GetTimeouts() *inngest.Timeouts {
if f.Timeouts == nil {
return nil
}
return f.Timeouts.Convert()
}
// Debounce represents debounce configuration used when creating a new function within
// FunctionOpts
type Debounce struct {
// Key is an optional expression to use for constraining the debounce to a given
// value.
Key string `json:"key,omitempty"`
// Period is how long to listen for new events matching the optional key. Any event
// received during this period will reschedule the debounce to run after another period
// interval.
Period time.Duration `json:"period"`
// Timeout specifies the optional max lifetime of a debounce, ensuring that functions
// run after the given duration when a debounce is rescheduled indefinitely.
Timeout *time.Duration `json:"timeout,omitempty"`
}
// Throttle represents concurrency over time.
type Throttle struct {
// Limit is how often the function can be called within the specified period. The
// minimum limit is 1.
Limit uint `json:"limit"`
// Period represents the time period for throttling the function. The minimum
// granularity is 1 second. Run starts are spaced evenly through the given period.
Period time.Duration `json:"period"`
// Burst is number of runs allowed to start in the given window, in a single burst,
// before throttling applies.
//
// A burst > 1 bypasses smoothing for the burst and allows many runs to start
// at once, if desired. Defaults to 1, which disables bursting.
Burst uint `json:"burst"`
// Key is an optional string to constrain throttling using event data. For
// example, if you want to throttle incoming notifications based off of a user's
// ID in an event you can use the following key: "event.user.id". This ensures
// that we throttle functions for each user independently.
Key *string `json:"key,omitempty"`
}
type RateLimit struct {
// Limit is how often the function can be called within the specified period
Limit uint `json:"limit"`
// Period represents the time period for throttling the function
Period time.Duration `json:"period"`
// Key is an optional string to constrain rate limiting using event data. For
// example, if you want to rate limit incoming notifications based off of a user's
// ID in an event you can use the following key: "event.user.id". This ensures
// that we rate limit functions for each user independently.
Key *string `json:"key,omitempty"`
}
// Convert converts a RateLimit to an inngest.RateLimit
func (r RateLimit) Convert() *inngest.RateLimit {
return &inngest.RateLimit{
Limit: r.Limit,
Period: r.Period.String(),
Key: r.Key,
}
}
// Timeouts represents timeouts for the function. If any of the timeouts are hit, the function
// will be marked as cancelled with a cancellation reason.
type Timeouts struct {
// Start represents the timeout for starting a function. If the time between scheduling
// and starting a function exceeds this value, the function will be cancelled. Note that
// this is inclusive of time between retries.
//
// A function may exceed this duration because of concurrency limits, throttling, etc.
Start *time.Duration `json:"start,omitempty"`
// Finish represents the time between a function starting and the function finishing.
// If a function takes longer than this time to finish, the function is marked as cancelled.
// The start time is taken from the time that the first successful function request begins,
// and does not include the time spent in the queue before the function starts.
//
// Note that if the final request to a function begins before this timeout, and completes
// after this timeout, the function will succeed.
Finish *time.Duration `json:"finish,omitempty"`
}
func (t Timeouts) Convert() *inngest.Timeouts {
var start, finish *string
if t.Start != nil {
s := t.Start.String()
start = &s
}
if t.Finish != nil {
f := t.Finish.String()
finish = &f
}
return &inngest.Timeouts{
Start: start,
Finish: finish,
}
}
// CreateFunction creates a new function which can be registered within a handler.
//
// This function uses generics, allowing you to supply the event that triggers the function.
// For example, if you have a signup event defined as a struct you can use this to strongly
// type your input:
//
// type SignupEvent struct {
// Name string
// Data struct {
// Email string
// AccountID string
// }
// }
//
// f := CreateFunction(
// inngestgo.FunctionOptions{Name: "Post-signup flow"},
// inngestgo.EventTrigger("user/signed.up"),
// func(ctx context.Context, input gosdk.Input[SignupEvent]) (any, error) {
// // .. Your logic here. input.Event will be strongly typed as a SignupEvent.
// // step.Run(ctx, "Do some logic", func(ctx context.Context) (string, error) { return "hi", nil })
// },
// )
func CreateFunction[T any](
fc FunctionOpts,
trigger inngest.Trigger,
f SDKFunction[T],
) ServableFunction {
// Validate that the input type is a concrete type, and not an interface.
//
// The only exception is `any`, when users don't care about the input event
// eg. for cron based functions.
sf := servableFunc{
fc: fc,
trigger: trigger,
f: f,
}
zt := sf.ZeroType()
if zt.Interface() == nil && zt.NumMethod() > 0 {
panic("You cannot use an interface type as the input within an Inngest function.")
}
return sf
}
func EventTrigger(name string, expression *string) inngest.Trigger {
return inngest.Trigger{
EventTrigger: &inngest.EventTrigger{
Event: name,
Expression: expression,
},
}
}
func CronTrigger(cron string) inngest.Trigger {
return inngest.Trigger{
CronTrigger: &inngest.CronTrigger{
Cron: cron,
},
}
}
// SDKFunction represents a user-defined function to be called based off of events or
// on a schedule.
//
// The function is registered with the SDK by calling `CreateFunction` with the function
// name, the trigger, the event type for marshalling, and any options.
//
// This uses generics to strongly type input events:
//
// func(ctx context.Context, input gosdk.Input[SignupEvent]) (any, error) {
// // .. Your logic here. input.Event will be strongly typed as a SignupEvent.
// }
type SDKFunction[T any] func(ctx context.Context, input Input[T]) (any, error)
// ServableFunction defines a function which can be called by a handler's Serve method.
//
// This is created via CreateFunction in this package.
type ServableFunction interface {
// Slug returns the function's human-readable ID, such as "sign-up-flow".
Slug() string
// Name returns the function name.
Name() string
Config() FunctionOpts
// Trigger returns the event name or schedule that triggers the function.
Trigger() inngest.Trigger
// ZeroEvent returns the zero event type to marshal the event into, given an
// event name.
ZeroEvent() any
// Func returns the SDK function to call. This must alawys be of type SDKFunction,
// but has an any type as we register many functions of different types into a
// type-agnostic handler; this is a generic implementation detail, unfortunately.
Func() any
}
// Input is the input data passed to your function. It is comprised of the triggering event
// and call context.
type Input[T any] struct {
Event T `json:"event"`
Events []T `json:"events"`
InputCtx InputCtx `json:"ctx"`
}
type InputCtx struct {
Env string `json:"env"`
FunctionID string `json:"fn_id"`
RunID string `json:"run_id"`
StepID string `json:"step_id"`
Attempt int `json:"attempt"`
}
type servableFunc struct {
fc FunctionOpts
trigger inngest.Trigger
f any
}
func (s servableFunc) Config() FunctionOpts {
return s.fc
}
func (s servableFunc) Slug() string {
if s.fc.ID == "" {
return slug.Make(s.fc.Name)
}
return s.fc.ID
}
func (s servableFunc) Name() string {
return s.fc.Name
}
func (s servableFunc) Trigger() inngest.Trigger {
return s.trigger
}
func (s servableFunc) ZeroType() reflect.Value {
// Grab the concrete type from the generic Input[T] type. This lets us easily
// initialize new values of this type at runtime.
fVal := reflect.ValueOf(s.f)
inputVal := reflect.New(fVal.Type().In(1)).Elem()
return reflect.New(inputVal.FieldByName("Event").Type()).Elem()
}
func (s servableFunc) ZeroEvent() any {
return s.ZeroType().Interface()
}
func (s servableFunc) Func() any {
return s.f
}