forked from vendasta/pubsub-plugin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.ts
114 lines (103 loc) · 3.74 KB
/
index.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import { Plugin, PluginMeta, PluginEvent, RetryError } from '@posthog/plugin-scaffold'
import { PubSub, Topic } from "@google-cloud/pubsub"
type PubSubPlugin = Plugin<{
global: {
pubSubClient: PubSub
pubSubTopic: Topic
}
config: {
topicId: string
},
}>
export const setupPlugin: PubSubPlugin['setupPlugin'] = async (meta) => {
const { global, attachments, config } = meta
if (!attachments.googleCloudKeyJson) {
throw new Error('JSON config not provided!')
}
if (!config.topicId) {
throw new Error('Topic ID not provided!')
}
try {
const credentials = JSON.parse(attachments.googleCloudKeyJson.contents.toString())
global.pubSubClient = new PubSub({
projectId: credentials['project_id'],
credentials,
})
global.pubSubTopic = global.pubSubClient.topic(config.topicId);
// topic exists
await global.pubSubTopic.getMetadata()
} catch (error: any) {
// some other error? abort!
if (!error.message.includes("NOT_FOUND")) {
throw new Error(error)
}
console.log(`Creating PubSub Topic - ${config.topicId}`)
try {
await global.pubSubTopic.create()
} catch (error: any) {
// a different worker already created the table
if (!error.message.includes('ALREADY_EXISTS')) {
throw error
}
}
}
}
export async function exportEvents(events: PluginEvent[], { global, config }: PluginMeta<PubSubPlugin>) {
if (!global.pubSubClient) {
throw new Error('No PubSub client initialized!')
}
try {
const messages = events.map((fullEvent) => {
const { event, properties, $set, $set_once, distinct_id, team_id, site_url, now, sent_at, uuid, ...rest } =
fullEvent
const ip = properties?.['$ip'] || fullEvent.ip
const timestamp = fullEvent.timestamp || properties?.timestamp || now || sent_at
let ingestedProperties = properties
let elements = []
// only move prop to elements for the $autocapture action
if (event === '$autocapture' && properties?.['$elements']) {
const { $elements, ...props } = properties
ingestedProperties = props
elements = $elements
}
const message = {
event,
distinct_id,
team_id,
ip,
site_url,
timestamp,
uuid: uuid!,
properties: ingestedProperties || {},
elements: elements || [],
people_set: $set || {},
people_set_once: $set_once || {},
}
// Attributes to be sent with the pub/sub message
const attributes = {
event
}
return {data: Buffer.from(JSON.stringify(message)), attributes}
})
const start = Date.now()
await Promise.all(
messages.map((message) =>
global.pubSubTopic.publishMessage(message).then((messageId) => {
return messageId
})
)
)
const end = Date.now() - start
console.log(
`Published ${events.length} ${events.length > 1 ? 'events' : 'event'} to ${config.topicId}. Took ${
end / 1000
} seconds.`
)
} catch (error: any) {
console.error(
`Error publishing ${events.length} ${events.length > 1 ? 'events' : 'event'} to ${config.topicId}: `,
error
)
throw new RetryError(`Error publishing to Pub/Sub! ${JSON.stringify(error.errors)}`)
}
}