Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(server): event bus refactor - access req emitter - batch #3 #3769

Open
wants to merge 1 commit into
base: fabians/web-2414-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions packages/server/modules/accessrequests/domain/events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { ServerAccessRequestRecord } from '@/modules/accessrequests/repositories'
import { StreamRoles } from '@speckle/shared'

export const accessRequestEventsNamespace = 'accessrequests' as const

export const AccessRequestEvents = {
Created: `${accessRequestEventsNamespace}.created`,
Finalized: `${accessRequestEventsNamespace}.finalized`
} as const

export type AccessRequestEventsPayloads = {
[AccessRequestEvents.Created]: { request: ServerAccessRequestRecord }
[AccessRequestEvents.Finalized]: {
request: ServerAccessRequestRecord
/**
* ID of the user that finalized this request
*/
finalizedBy: string
/**
* If this object is set, request was approved
*/
approved?: {
role: StreamRoles
}
}
}
31 changes: 0 additions & 31 deletions packages/server/modules/accessrequests/events/emitter.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import {
AccessRequestType,
createNewRequestFactory,
Expand Down Expand Up @@ -36,6 +35,7 @@ import {
} from '@/modules/core/services/streams/access'
import { authorizeResolver } from '@/modules/shared'
import { LogicError } from '@/modules/shared/errors'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'

const getUser = getUserFactory({ db })
Expand All @@ -52,7 +52,7 @@ const requestProjectAccess = requestProjectAccessFactory({
getUserStreamAccessRequest,
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})

const requestStreamAccess = requestStreamAccessFactory({
Expand Down Expand Up @@ -90,7 +90,7 @@ const processPendingStreamRequest = processPendingStreamRequestFactory({
validateStreamAccess,
addOrUpdateStreamCollaborator,
deleteRequestById: deleteRequestByIdFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})

const processPendingProjectRequest = processPendingStreamRequest
Expand Down
4 changes: 2 additions & 2 deletions packages/server/modules/accessrequests/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { db } from '@/db/knex'
import { moduleLogger } from '@/logging/logging'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { initializeEventListenerFactory } from '@/modules/accessrequests/services/eventListener'
import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams'
import { publishNotification } from '@/modules/notifications/services/publication'
import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getEventBus } from '@/modules/shared/services/eventBus'

let quitListeners: Optional<() => void> = undefined

Expand All @@ -16,7 +16,7 @@ const ServerAccessRequestsModule: SpeckleModule = {
const initializeEventListener = initializeEventListenerFactory({
getStreamCollaborators: getStreamCollaboratorsFactory({ db }),
publishNotification,
accessRequestsEventListener: AccessRequestsEmitter.listen
eventBus: getEventBus()
})
quitListeners = initializeEventListener()
}
Expand Down
26 changes: 10 additions & 16 deletions packages/server/modules/accessrequests/services/eventListener.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import {
AccessRequestsEmitter,
AccessRequestsEvents,
AccessRequestsEventsPayloads
} from '@/modules/accessrequests/events/emitter'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { GetStreamCollaborators } from '@/modules/core/domain/streams/operations'
import { Roles } from '@/modules/core/helpers/mainConstants'
import {
NotificationPublisher,
NotificationType
} from '@/modules/notifications/helpers/types'
import { EventBus, EventPayload } from '@/modules/shared/services/eventBus'

type OnServerAccessRequestCreatedDeps = {
getStreamCollaborators: GetStreamCollaborators
Expand All @@ -18,8 +15,8 @@ type OnServerAccessRequestCreatedDeps = {

const onServerAccessRequestCreatedFactory =
(deps: OnServerAccessRequestCreatedDeps) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Created]) => {
const { request } = payload
async (payload: EventPayload<typeof AccessRequestEvents.Created>) => {
const { request } = payload.payload

// Send out email to all owners of the stream
if (isStreamAccessRequest(request)) {
Expand All @@ -46,8 +43,8 @@ type OnServerAccessRequestFinalizedDeps = {

const onServerAccessRequestFinalizedFactory =
(deps: OnServerAccessRequestFinalizedDeps) =>
async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Finalized]) => {
const { approved, request, finalizedBy } = payload
async (payload: EventPayload<typeof AccessRequestEvents.Finalized>) => {
const { approved, request, finalizedBy } = payload.payload

// Send out email to requester, if accepted
if (approved && isStreamAccessRequest(request)) {
Expand All @@ -67,7 +64,7 @@ const onServerAccessRequestFinalizedFactory =
export const initializeEventListenerFactory =
(
deps: {
accessRequestsEventListener: (typeof AccessRequestsEmitter)['listen']
eventBus: EventBus
} & OnServerAccessRequestCreatedDeps &
OnServerAccessRequestFinalizedDeps
) =>
Expand All @@ -76,12 +73,9 @@ export const initializeEventListenerFactory =
const onServerAccessRequestFinalized = onServerAccessRequestFinalizedFactory(deps)

const quitCbs = [
deps.accessRequestsEventListener(
AccessRequestsEvents.Created,
onServerAccessRequestCreated
),
deps.accessRequestsEventListener(
AccessRequestsEvents.Finalized,
deps.eventBus.listen(AccessRequestEvents.Created, onServerAccessRequestCreated),
deps.eventBus.listen(
AccessRequestEvents.Finalized,
onServerAccessRequestFinalized
)
]
Expand Down
25 changes: 16 additions & 9 deletions packages/server/modules/accessrequests/services/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
AccessRequestCreationError,
AccessRequestProcessingError
} from '@/modules/accessrequests/errors'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { StreamAccessRequestGraphQLReturn } from '@/modules/accessrequests/helpers/graphTypes'
import {
AccessRequestType,
Expand Down Expand Up @@ -35,6 +34,8 @@ import {
GetStream,
ValidateStreamAccess
} from '@/modules/core/domain/streams/operations'
import { EventBusEmit } from '@/modules/shared/services/eventBus'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'

function buildStreamAccessRequestGraphQLReturn(
record: ServerAccessRequestRecord<AccessRequestType.Stream, string>
Expand Down Expand Up @@ -85,7 +86,7 @@ export const requestProjectAccessFactory =
getUserStreamAccessRequest: GetUserStreamAccessRequest
getStream: GetStream
createNewRequest: CreateNewRequest
accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit']
emitEvent: EventBusEmit
}): RequestProjectAccess =>
async (userId: string, projectId: string) => {
const [stream, existingRequest] = await Promise.all([
Expand Down Expand Up @@ -121,8 +122,11 @@ export const requestProjectAccessFactory =
resourceId: projectId
})

await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Created, {
request: req
await deps.emitEvent({
eventName: AccessRequestEvents.Created,
payload: {
request: req
}
})

return req
Expand Down Expand Up @@ -168,7 +172,7 @@ export const processPendingStreamRequestFactory =
validateStreamAccess: ValidateStreamAccess
addOrUpdateStreamCollaborator: AddOrUpdateStreamCollaborator
deleteRequestById: DeleteRequestById
accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit']
emitEvent: EventBusEmit
}) =>
async (
userId: string,
Expand Down Expand Up @@ -216,10 +220,13 @@ export const processPendingStreamRequestFactory =

await deps.deleteRequestById(req.id)

await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Finalized, {
request: req,
approved: accept ? { role } : undefined,
finalizedBy: userId
await deps.emitEvent({
eventName: AccessRequestEvents.Finalized,
payload: {
request: req,
approved: accept ? { role } : undefined,
finalizedBy: userId
}
})

return req
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'
import {
createNewRequestFactory,
deleteRequestByIdFactory,
Expand Down Expand Up @@ -42,6 +42,7 @@ import {
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
import { BasicTestUser, createTestUsers } from '@/test/authHelper'
import {
Expand Down Expand Up @@ -75,7 +76,7 @@ const requestProjectAccess = requestProjectAccessFactory({
}),
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
const saveActivity = saveActivityFactory({ db })
const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver })
Expand Down Expand Up @@ -162,6 +163,8 @@ describe('Project access requests', () => {
id: ''
}

let quitters: (() => void)[] = []

before(async () => {
await cleanup()
await createTestUsers([me, otherGuy, anotherGuy])
Expand All @@ -176,6 +179,11 @@ describe('Project access requests', () => {
notificationsStateManager = buildNotificationsStateTracker()
})

afterEach(() => {
quitters.forEach((q) => q())
quitters = []
})

after(async () => {
notificationsStateManager.destroy()
})
Expand Down Expand Up @@ -226,6 +234,13 @@ describe('Project access requests', () => {
})

it('operation succeeds', async () => {
let eventFired = false
quitters.push(
getEventBus().listen(AccessRequestEvents.Created, async (payload) => {
expect(payload.payload.request.requesterId).to.eq(me.id)
eventFired = true
})
)
const sendEmailCall = EmailSendingServiceMock.hijackFunction(
'sendEmail',
async () => true
Expand Down Expand Up @@ -267,6 +282,7 @@ describe('Project access requests', () => {
userId: me.id
})
expect(streamActivity).to.have.lengthOf(1)
expect(eventFired).to.be.true
})

it('operation fails if request already exists', async () => {
Expand Down Expand Up @@ -447,6 +463,15 @@ describe('Project access requests', () => {
]
validProcessingDataSet.forEach(({ display, accept, role }) => {
it(`${display} works`, async () => {
let eventFired = false
quitters.push(
getEventBus().listen(AccessRequestEvents.Finalized, async (payload) => {
expect(!!payload.payload.approved).to.eq(accept)
expect(payload.payload.finalizedBy).to.eq(me.id)
eventFired = true
})
)

const results = await useReq(validReqId, accept, role)
expect(results).to.not.haveGraphQLErrors()
expect(results.data?.projectMutations.accessRequestMutations.use).to.be.ok
Expand Down Expand Up @@ -477,6 +502,7 @@ describe('Project access requests', () => {
})
expect(streamActivity).to.have.lengthOf(1)
}
expect(eventFired).to.be.true
})
})
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { buildApolloServer } from '@/app'
import { db } from '@/db/knex'
import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter'
import {
createNewRequestFactory,
deleteRequestByIdFactory,
Expand Down Expand Up @@ -44,6 +43,7 @@ import {
} from '@/modules/core/services/streams/access'
import { NotificationType } from '@/modules/notifications/helpers/types'
import { authorizeResolver } from '@/modules/shared'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { publish } from '@/modules/shared/utils/subscriptions'
import { BasicTestUser, createTestUsers } from '@/test/authHelper'
import {
Expand Down Expand Up @@ -78,7 +78,7 @@ const requestStreamAccess = requestStreamAccessFactory({
}),
getStream,
createNewRequest: createNewRequestFactory({ db }),
accessRequestsEmitter: AccessRequestsEmitter.emit
emitEvent: getEventBus().emit
})
})
const saveActivity = saveActivityFactory({ db })
Expand Down
15 changes: 6 additions & 9 deletions packages/server/modules/activitystream/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ import {
onServerInviteCreatedFactory,
onUserCreatedFactory
} from '@/modules/activitystream/services/eventListener'
import {
AccessRequestsEmitter,
AccessRequestsEvents
} from '@/modules/accessrequests/events/emitter'
import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core'
import { publish } from '@/modules/shared/utils/subscriptions'
import { isStreamAccessRequest } from '@/modules/accessrequests/repositories'
import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events'
import { ProjectEvents } from '@/modules/core/domain/projects/events'
import { UserEvents } from '@/modules/core/domain/users/events'
import { AccessRequestEvents } from '@/modules/accessrequests/domain/events'

let scheduledTask: ReturnType<ScheduleExecution> | null = null
let quitEventListeners: Optional<() => void> = undefined
Expand All @@ -62,16 +59,16 @@ const initializeEventListeners = ({
// this activity will always go in the main DB
onUserCreatedFactory({ saveActivity: saveActivityFactory({ db }) })
),
AccessRequestsEmitter.listen(AccessRequestsEvents.Created, async ({ request }) => {
if (!isStreamAccessRequest(request)) return
eventBus.listen(AccessRequestEvents.Created, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
return await onServerAccessRequestCreatedFactory({
addStreamAccessRequestedActivity: addStreamAccessRequestedActivityFactory({
saveActivity: saveActivityFactory({ db })
})
})({ request })
})(payload)
}),
AccessRequestsEmitter.listen(AccessRequestsEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.request)) return
eventBus.listen(AccessRequestEvents.Finalized, async (payload) => {
if (!isStreamAccessRequest(payload.payload.request)) return
await onServerAccessRequestFinalizedFactory({
addStreamAccessRequestDeclinedActivity:
addStreamAccessRequestDeclinedActivityFactory({
Expand Down
Loading