Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pubsub subscriber topic not set #626

Merged
merged 7 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/quiet-papayas-greet.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@sebspark/pubsub": patch
---

Fixed bug that broke topic.initiate, leading to subscription creation failing.
67 changes: 50 additions & 17 deletions packages/pubsub/src/lib/subscriber.spec.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import { beforeEach } from 'node:test'
import { PubSub, type Subscription, type Topic } from '@google-cloud/pubsub'
import type { Schema } from 'avsc'
import { type MockedObject, afterAll, describe, expect, it, vi } from 'vitest'
import { type MockedObject, beforeAll, describe, expect, it, vi } from 'vitest'
import { createSubscriber } from './subscriber'

type ExampleMessage = {
messageType: string
message: string
}

const message = {
messageType: 'type of message',
message: 'message data',
} satisfies ExampleMessage

type ExamplePubsubChannels = {
example: ExampleMessage
}
Expand Down Expand Up @@ -77,25 +73,62 @@ describe('subscriber', () => {
const topicName = 'example'
const subscriptionName = 'example-subscription'

it('uses an existing subscription if it exists', async () => {
const topicMock = new PubSub().topic(topicName) as MockedObject<Topic>
const subscriptionMock = topicMock.subscription(
let topicMock: MockedObject<Topic>
let subscriptionMock: MockedObject<Subscription>

beforeAll(() => {
topicMock = new PubSub().topic(topicName) as MockedObject<Topic>
subscriptionMock = topicMock.subscription(
subscriptionName
) as MockedObject<Subscription>
})

beforeEach(() => {})

subscriptionMock.exists.mockImplementation(() => [true])
describe('subscribe', () => {
it('uses an existing subscription if it exists', async () => {
const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
topicMock.createSubscription.mockClear()

await subscriber.topic('example').subscribe('existing-subscription', {
onMessage: () => Promise.resolve(),
})

expect(topicMock.subscription).toHaveBeenCalled()
expect(topicMock.createSubscription.mock.calls.length).toBe(0)
})
})

describe('initiate', () => {
it('does not create a subscription if it exists', async () => {
subscriptionMock.exists.mockImplementationOnce(() => [true])

topicMock.createSubscription.mockClear()
const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

await subscriber.topic('example').subscribe('existing-subscription', {
onMessage: () => Promise.resolve(),
topicMock.createSubscription.mockClear()

await subscriber.topic('example').initiate('existing-subscription')

expect(topicMock.createSubscription.mock.calls.length).toBe(0)
})

expect(topicMock.subscription).toHaveBeenCalled()
expect(topicMock.createSubscription.mock.calls.length).toBe(0)
it('creates a subscription if it does not exist', async () => {
subscriptionMock.exists.mockImplementationOnce(() => [false])

const subscriber = createSubscriber<ExamplePubsubChannels>({
projectId: 'test',
})

topicMock.createSubscription.mockClear()

await subscriber.topic('example').initiate('example-subscription')

expect(topicMock.createSubscription).toHaveBeenCalled()
})
})
})
9 changes: 3 additions & 6 deletions packages/pubsub/src/lib/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
type Topic,
} from '@google-cloud/pubsub'

const makeSureSubacriptionExists = async (
const makeSureSubscriptionExists = async (
topic: Topic,
name: string,
options?: PubSubOptions
Expand Down Expand Up @@ -68,16 +68,13 @@ export const createSubscriber = <T extends Record<string, unknown>>(

const typedClient: SubscriptionClient<T> = {
topic: (name) => {
let _topic: Topic
const _topic: Topic = client.topic(name as string)

return {
initiate: async (subscriptionName, options) => {
await makeSureSubacriptionExists(_topic, subscriptionName, options)
await makeSureSubscriptionExists(_topic, subscriptionName, options)
},
subscribe: async (subscriptionName, callbacks, options) => {
if (!_topic) {
_topic = client.topic(name as string)
}
const subscription = _topic.subscription(subscriptionName)
subscription.on('message', async (msg) => {
const data = JSON.parse(msg.data.toString('utf8'))
Expand Down
Loading