diff --git a/packages/server/modules/accessrequests/domain/events.ts b/packages/server/modules/accessrequests/domain/events.ts new file mode 100644 index 0000000000..5df619b3d8 --- /dev/null +++ b/packages/server/modules/accessrequests/domain/events.ts @@ -0,0 +1,26 @@ +import { ServerAccessRequestRecord } from '@/modules/accessrequests/repositories' +import { StreamRoles } from '@speckle/shared' + +export const accessRequestEventsNamespace = 'accessrequests' as const + +export const AccessRequestEvents = { + Created: `${accessRequestEventsNamespace}.created`, + Finalized: `${accessRequestEventsNamespace}.finalized` +} as const + +export type AccessRequestEventsPayloads = { + [AccessRequestEvents.Created]: { request: ServerAccessRequestRecord } + [AccessRequestEvents.Finalized]: { + request: ServerAccessRequestRecord + /** + * ID of the user that finalized this request + */ + finalizedBy: string + /** + * If this object is set, request was approved + */ + approved?: { + role: StreamRoles + } + } +} diff --git a/packages/server/modules/accessrequests/events/emitter.ts b/packages/server/modules/accessrequests/events/emitter.ts deleted file mode 100644 index 21c2a6aa11..0000000000 --- a/packages/server/modules/accessrequests/events/emitter.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { ServerAccessRequestRecord } from '@/modules/accessrequests/repositories' -import { StreamRoles } from '@/modules/core/helpers/mainConstants' -import { initializeModuleEventEmitter } from '@/modules/shared/services/moduleEventEmitterSetup' - -export enum AccessRequestsEvents { - Created = 'created', - Finalized = 'finalized' -} - -export type AccessRequestsEventsPayloads = { - [AccessRequestsEvents.Created]: { request: ServerAccessRequestRecord } - [AccessRequestsEvents.Finalized]: { - request: ServerAccessRequestRecord - /** - * ID of the user that finalized this request - */ - finalizedBy: string - /** - * If this object is set, request was approved - */ - approved?: { - role: StreamRoles - } - } -} - -const { emit, listen } = initializeModuleEventEmitter({ - moduleName: 'accessrequests' -}) - -export const AccessRequestsEmitter = { emit, listen, events: AccessRequestsEvents } diff --git a/packages/server/modules/accessrequests/graph/resolvers/index.ts b/packages/server/modules/accessrequests/graph/resolvers/index.ts index f0935f8a93..33b0e509e3 100644 --- a/packages/server/modules/accessrequests/graph/resolvers/index.ts +++ b/packages/server/modules/accessrequests/graph/resolvers/index.ts @@ -1,5 +1,4 @@ import { db } from '@/db/knex' -import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter' import { AccessRequestType, createNewRequestFactory, @@ -36,6 +35,7 @@ import { } from '@/modules/core/services/streams/access' import { authorizeResolver } from '@/modules/shared' import { LogicError } from '@/modules/shared/errors' +import { getEventBus } from '@/modules/shared/services/eventBus' import { publish } from '@/modules/shared/utils/subscriptions' const getUser = getUserFactory({ db }) @@ -52,7 +52,7 @@ const requestProjectAccess = requestProjectAccessFactory({ getUserStreamAccessRequest, getStream, createNewRequest: createNewRequestFactory({ db }), - accessRequestsEmitter: AccessRequestsEmitter.emit + emitEvent: getEventBus().emit }) const requestStreamAccess = requestStreamAccessFactory({ @@ -90,7 +90,7 @@ const processPendingStreamRequest = processPendingStreamRequestFactory({ validateStreamAccess, addOrUpdateStreamCollaborator, deleteRequestById: deleteRequestByIdFactory({ db }), - accessRequestsEmitter: AccessRequestsEmitter.emit + emitEvent: getEventBus().emit }) const processPendingProjectRequest = processPendingStreamRequest diff --git a/packages/server/modules/accessrequests/index.ts b/packages/server/modules/accessrequests/index.ts index abcec1b3cc..77ae0e9d73 100644 --- a/packages/server/modules/accessrequests/index.ts +++ b/packages/server/modules/accessrequests/index.ts @@ -1,10 +1,10 @@ import { db } from '@/db/knex' import { moduleLogger } from '@/logging/logging' -import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter' import { initializeEventListenerFactory } from '@/modules/accessrequests/services/eventListener' import { getStreamCollaboratorsFactory } from '@/modules/core/repositories/streams' import { publishNotification } from '@/modules/notifications/services/publication' import { Optional, SpeckleModule } from '@/modules/shared/helpers/typeHelper' +import { getEventBus } from '@/modules/shared/services/eventBus' let quitListeners: Optional<() => void> = undefined @@ -16,7 +16,7 @@ const ServerAccessRequestsModule: SpeckleModule = { const initializeEventListener = initializeEventListenerFactory({ getStreamCollaborators: getStreamCollaboratorsFactory({ db }), publishNotification, - accessRequestsEventListener: AccessRequestsEmitter.listen + eventBus: getEventBus() }) quitListeners = initializeEventListener() } diff --git a/packages/server/modules/accessrequests/services/eventListener.ts b/packages/server/modules/accessrequests/services/eventListener.ts index 9f16ed0b39..48680f55a1 100644 --- a/packages/server/modules/accessrequests/services/eventListener.ts +++ b/packages/server/modules/accessrequests/services/eventListener.ts @@ -1,8 +1,4 @@ -import { - AccessRequestsEmitter, - AccessRequestsEvents, - AccessRequestsEventsPayloads -} from '@/modules/accessrequests/events/emitter' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' import { isStreamAccessRequest } from '@/modules/accessrequests/repositories' import { GetStreamCollaborators } from '@/modules/core/domain/streams/operations' import { Roles } from '@/modules/core/helpers/mainConstants' @@ -10,6 +6,7 @@ import { NotificationPublisher, NotificationType } from '@/modules/notifications/helpers/types' +import { EventBus, EventPayload } from '@/modules/shared/services/eventBus' type OnServerAccessRequestCreatedDeps = { getStreamCollaborators: GetStreamCollaborators @@ -18,8 +15,8 @@ type OnServerAccessRequestCreatedDeps = { const onServerAccessRequestCreatedFactory = (deps: OnServerAccessRequestCreatedDeps) => - async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Created]) => { - const { request } = payload + async (payload: EventPayload) => { + const { request } = payload.payload // Send out email to all owners of the stream if (isStreamAccessRequest(request)) { @@ -46,8 +43,8 @@ type OnServerAccessRequestFinalizedDeps = { const onServerAccessRequestFinalizedFactory = (deps: OnServerAccessRequestFinalizedDeps) => - async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Finalized]) => { - const { approved, request, finalizedBy } = payload + async (payload: EventPayload) => { + const { approved, request, finalizedBy } = payload.payload // Send out email to requester, if accepted if (approved && isStreamAccessRequest(request)) { @@ -67,7 +64,7 @@ const onServerAccessRequestFinalizedFactory = export const initializeEventListenerFactory = ( deps: { - accessRequestsEventListener: (typeof AccessRequestsEmitter)['listen'] + eventBus: EventBus } & OnServerAccessRequestCreatedDeps & OnServerAccessRequestFinalizedDeps ) => @@ -76,12 +73,9 @@ export const initializeEventListenerFactory = const onServerAccessRequestFinalized = onServerAccessRequestFinalizedFactory(deps) const quitCbs = [ - deps.accessRequestsEventListener( - AccessRequestsEvents.Created, - onServerAccessRequestCreated - ), - deps.accessRequestsEventListener( - AccessRequestsEvents.Finalized, + deps.eventBus.listen(AccessRequestEvents.Created, onServerAccessRequestCreated), + deps.eventBus.listen( + AccessRequestEvents.Finalized, onServerAccessRequestFinalized ) ] diff --git a/packages/server/modules/accessrequests/services/stream.ts b/packages/server/modules/accessrequests/services/stream.ts index 10c9a37599..7e134bcb9e 100644 --- a/packages/server/modules/accessrequests/services/stream.ts +++ b/packages/server/modules/accessrequests/services/stream.ts @@ -2,7 +2,6 @@ import { AccessRequestCreationError, AccessRequestProcessingError } from '@/modules/accessrequests/errors' -import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter' import { StreamAccessRequestGraphQLReturn } from '@/modules/accessrequests/helpers/graphTypes' import { AccessRequestType, @@ -35,6 +34,8 @@ import { GetStream, ValidateStreamAccess } from '@/modules/core/domain/streams/operations' +import { EventBusEmit } from '@/modules/shared/services/eventBus' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' function buildStreamAccessRequestGraphQLReturn( record: ServerAccessRequestRecord @@ -85,7 +86,7 @@ export const requestProjectAccessFactory = getUserStreamAccessRequest: GetUserStreamAccessRequest getStream: GetStream createNewRequest: CreateNewRequest - accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit'] + emitEvent: EventBusEmit }): RequestProjectAccess => async (userId: string, projectId: string) => { const [stream, existingRequest] = await Promise.all([ @@ -121,8 +122,11 @@ export const requestProjectAccessFactory = resourceId: projectId }) - await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Created, { - request: req + await deps.emitEvent({ + eventName: AccessRequestEvents.Created, + payload: { + request: req + } }) return req @@ -168,7 +172,7 @@ export const processPendingStreamRequestFactory = validateStreamAccess: ValidateStreamAccess addOrUpdateStreamCollaborator: AddOrUpdateStreamCollaborator deleteRequestById: DeleteRequestById - accessRequestsEmitter: (typeof AccessRequestsEmitter)['emit'] + emitEvent: EventBusEmit }) => async ( userId: string, @@ -216,10 +220,13 @@ export const processPendingStreamRequestFactory = await deps.deleteRequestById(req.id) - await deps.accessRequestsEmitter(AccessRequestsEmitter.events.Finalized, { - request: req, - approved: accept ? { role } : undefined, - finalizedBy: userId + await deps.emitEvent({ + eventName: AccessRequestEvents.Finalized, + payload: { + request: req, + approved: accept ? { role } : undefined, + finalizedBy: userId + } }) return req diff --git a/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts b/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts index edb8a658e2..299ddc0744 100644 --- a/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts +++ b/packages/server/modules/accessrequests/tests/projectAccessRequests.spec.ts @@ -1,5 +1,5 @@ import { db } from '@/db/knex' -import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' import { createNewRequestFactory, deleteRequestByIdFactory, @@ -42,6 +42,7 @@ import { } from '@/modules/core/services/streams/access' import { NotificationType } from '@/modules/notifications/helpers/types' import { authorizeResolver } from '@/modules/shared' +import { getEventBus } from '@/modules/shared/services/eventBus' import { publish } from '@/modules/shared/utils/subscriptions' import { BasicTestUser, createTestUsers } from '@/test/authHelper' import { @@ -75,7 +76,7 @@ const requestProjectAccess = requestProjectAccessFactory({ }), getStream, createNewRequest: createNewRequestFactory({ db }), - accessRequestsEmitter: AccessRequestsEmitter.emit + emitEvent: getEventBus().emit }) const saveActivity = saveActivityFactory({ db }) const validateStreamAccess = validateStreamAccessFactory({ authorizeResolver }) @@ -162,6 +163,8 @@ describe('Project access requests', () => { id: '' } + let quitters: (() => void)[] = [] + before(async () => { await cleanup() await createTestUsers([me, otherGuy, anotherGuy]) @@ -176,6 +179,11 @@ describe('Project access requests', () => { notificationsStateManager = buildNotificationsStateTracker() }) + afterEach(() => { + quitters.forEach((q) => q()) + quitters = [] + }) + after(async () => { notificationsStateManager.destroy() }) @@ -226,6 +234,13 @@ describe('Project access requests', () => { }) it('operation succeeds', async () => { + let eventFired = false + quitters.push( + getEventBus().listen(AccessRequestEvents.Created, async (payload) => { + expect(payload.payload.request.requesterId).to.eq(me.id) + eventFired = true + }) + ) const sendEmailCall = EmailSendingServiceMock.hijackFunction( 'sendEmail', async () => true @@ -267,6 +282,7 @@ describe('Project access requests', () => { userId: me.id }) expect(streamActivity).to.have.lengthOf(1) + expect(eventFired).to.be.true }) it('operation fails if request already exists', async () => { @@ -447,6 +463,15 @@ describe('Project access requests', () => { ] validProcessingDataSet.forEach(({ display, accept, role }) => { it(`${display} works`, async () => { + let eventFired = false + quitters.push( + getEventBus().listen(AccessRequestEvents.Finalized, async (payload) => { + expect(!!payload.payload.approved).to.eq(accept) + expect(payload.payload.finalizedBy).to.eq(me.id) + eventFired = true + }) + ) + const results = await useReq(validReqId, accept, role) expect(results).to.not.haveGraphQLErrors() expect(results.data?.projectMutations.accessRequestMutations.use).to.be.ok @@ -477,6 +502,7 @@ describe('Project access requests', () => { }) expect(streamActivity).to.have.lengthOf(1) } + expect(eventFired).to.be.true }) }) }) diff --git a/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts b/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts index 3806adb469..e9f9a22669 100644 --- a/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts +++ b/packages/server/modules/accessrequests/tests/streamAccessRequests.spec.ts @@ -1,6 +1,5 @@ import { buildApolloServer } from '@/app' import { db } from '@/db/knex' -import { AccessRequestsEmitter } from '@/modules/accessrequests/events/emitter' import { createNewRequestFactory, deleteRequestByIdFactory, @@ -44,6 +43,7 @@ import { } from '@/modules/core/services/streams/access' import { NotificationType } from '@/modules/notifications/helpers/types' import { authorizeResolver } from '@/modules/shared' +import { getEventBus } from '@/modules/shared/services/eventBus' import { publish } from '@/modules/shared/utils/subscriptions' import { BasicTestUser, createTestUsers } from '@/test/authHelper' import { @@ -78,7 +78,7 @@ const requestStreamAccess = requestStreamAccessFactory({ }), getStream, createNewRequest: createNewRequestFactory({ db }), - accessRequestsEmitter: AccessRequestsEmitter.emit + emitEvent: getEventBus().emit }) }) const saveActivity = saveActivityFactory({ db }) diff --git a/packages/server/modules/activitystream/index.ts b/packages/server/modules/activitystream/index.ts index cbb8c74ab0..6f66e6c2dc 100644 --- a/packages/server/modules/activitystream/index.ts +++ b/packages/server/modules/activitystream/index.ts @@ -31,16 +31,13 @@ import { onServerInviteCreatedFactory, onUserCreatedFactory } from '@/modules/activitystream/services/eventListener' -import { - AccessRequestsEmitter, - AccessRequestsEvents -} from '@/modules/accessrequests/events/emitter' import { isProjectResourceTarget } from '@/modules/serverinvites/helpers/core' import { publish } from '@/modules/shared/utils/subscriptions' import { isStreamAccessRequest } from '@/modules/accessrequests/repositories' import { ServerInvitesEvents } from '@/modules/serverinvites/domain/events' import { ProjectEvents } from '@/modules/core/domain/projects/events' import { UserEvents } from '@/modules/core/domain/users/events' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' let scheduledTask: ReturnType | null = null let quitEventListeners: Optional<() => void> = undefined @@ -62,16 +59,16 @@ const initializeEventListeners = ({ // this activity will always go in the main DB onUserCreatedFactory({ saveActivity: saveActivityFactory({ db }) }) ), - AccessRequestsEmitter.listen(AccessRequestsEvents.Created, async ({ request }) => { - if (!isStreamAccessRequest(request)) return + eventBus.listen(AccessRequestEvents.Created, async (payload) => { + if (!isStreamAccessRequest(payload.payload.request)) return return await onServerAccessRequestCreatedFactory({ addStreamAccessRequestedActivity: addStreamAccessRequestedActivityFactory({ saveActivity: saveActivityFactory({ db }) }) - })({ request }) + })(payload) }), - AccessRequestsEmitter.listen(AccessRequestsEvents.Finalized, async (payload) => { - if (!isStreamAccessRequest(payload.request)) return + eventBus.listen(AccessRequestEvents.Finalized, async (payload) => { + if (!isStreamAccessRequest(payload.payload.request)) return await onServerAccessRequestFinalizedFactory({ addStreamAccessRequestDeclinedActivity: addStreamAccessRequestDeclinedActivityFactory({ diff --git a/packages/server/modules/activitystream/services/eventListener.ts b/packages/server/modules/activitystream/services/eventListener.ts index 209d26d9fb..0ac74dba43 100644 --- a/packages/server/modules/activitystream/services/eventListener.ts +++ b/packages/server/modules/activitystream/services/eventListener.ts @@ -1,8 +1,5 @@ import { Logger } from '@/logging/logging' -import { - AccessRequestsEvents, - AccessRequestsEventsPayloads -} from '@/modules/accessrequests/events/emitter' +import { AccessRequestEvents } from '@/modules/accessrequests/domain/events' import { AccessRequestType, isStreamAccessRequest @@ -47,11 +44,11 @@ export const onServerAccessRequestCreatedFactory = }: { addStreamAccessRequestedActivity: AddStreamAccessRequestedActivity }) => - async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Created]) => { + async (payload: EventPayload) => { const { request: { resourceId, requesterId } - } = payload - if (!isStreamAccessRequest(payload.request)) return + } = payload.payload + if (!isStreamAccessRequest(payload.payload.request)) return if (!resourceId) return await addStreamAccessRequestedActivity({ @@ -66,12 +63,12 @@ export const onServerAccessRequestFinalizedFactory = }: { addStreamAccessRequestDeclinedActivity: AddStreamAccessRequestDeclinedActivity }) => - async (payload: AccessRequestsEventsPayloads[AccessRequestsEvents.Finalized]) => { + async (payload: EventPayload) => { const { approved, finalizedBy, request: { resourceId, resourceType, requesterId } - } = payload + } = payload.payload if (!resourceId) return if (resourceType === AccessRequestType.Stream) { diff --git a/packages/server/modules/shared/services/eventBus.ts b/packages/server/modules/shared/services/eventBus.ts index 06aeb595ab..e4e4d3d0f5 100644 --- a/packages/server/modules/shared/services/eventBus.ts +++ b/packages/server/modules/shared/services/eventBus.ts @@ -31,6 +31,10 @@ import { versionEventsNamespace, VersionEventsPayloads } from '@/modules/core/domain/commits/events' +import { + accessRequestEventsNamespace, + AccessRequestEventsPayloads +} from '@/modules/accessrequests/domain/events' type AllEventsWildcard = '**' type EventWildcard = '*' @@ -55,6 +59,7 @@ type EventsByNamespace = { [projectEventsNamespace]: ProjectEventsPayloads [userEventsNamespace]: UserEventsPayloads [versionEventsNamespace]: VersionEventsPayloads + [accessRequestEventsNamespace]: AccessRequestEventsPayloads } type EventTypes = UnionToIntersection