Skip to content

Commit

Permalink
feat(server): multi region blob support (#3653)
Browse files Browse the repository at this point in the history
Co-authored-by: Iain Sproat <[email protected]>
  • Loading branch information
fabis94 and iainsproat authored Dec 10, 2024
1 parent 510a079 commit 8d0cbad
Show file tree
Hide file tree
Showing 65 changed files with 878 additions and 452 deletions.
4 changes: 3 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -597,7 +597,9 @@ jobs:
POSTGRES_USER: speckle
command: -c 'max_connections=1000' -c 'port=5433' -c 'wal_level=logical'
- image: 'minio/minio'
command: server /data --console-address ":9001"
command: server /data --console-address ":9001" --address "0.0.0.0:9000"
- image: 'minio/minio'
command: server /data --console-address ":9021" --address "0.0.0.0:9020"
environment:
# Same as test-server:
NODE_ENV: test
Expand Down
16 changes: 16 additions & 0 deletions .circleci/multiregion.test-ci.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,28 @@
"main": {
"postgres": {
"connectionUri": "postgresql://speckle:[email protected]:5432/speckle2_test"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9000",
"s3Region": "us-east-1"
}
},
"regions": {
"region1": {
"postgres": {
"connectionUri": "postgresql://speckle:[email protected]:5433/speckle2_test"
},
"blobStorage": {
"accessKey": "minioadmin",
"secretKey": "minioadmin",
"bucket": "speckle-server",
"createBucketIfNotExists": true,
"endpoint": "http://127.0.0.1:9020",
"s3Region": "us-east-1"
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ EMAIL_PORT="1025"

The web portal is available at `localhost:1080` and it's listening for mail on port `1025`.

### Minio (S3 storage)

Default credentials are: `minioadmin:minioadmin`
Main storage Web UI: [http://localhost:9001/](http://localhost:9001/)
Region1 storage Web UI: [http://localhost:9021/](http://localhost:9021/)

You can use the web UI to validate uploaded blobs

# Contributing

Please make sure you read the [contribution guidelines](https://github.com/specklesystems/speckle-server/blob/main/CONTRIBUTING.md) for an overview of the best practices we try to follow.
Expand Down
1 change: 1 addition & 0 deletions packages/server/db/migrations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ export const migrateDbToLatest = async (params: { db: Knex; region: string }) =>
await db.migrate.latest()
} catch (err: unknown) {
logger.error({ err, region }, 'Error migrating db to latest for region "{region}".')
throw err
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { Knex } from 'knex'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getUserFactory } from '@/modules/core/repositories/users'
import { getServerInfoFactory } from '@/modules/core/repositories/server'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'

const tables = {
streamActivity: <T extends object = StreamActivityRecord>(db: Knex) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ import {
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getEventBus } from '@/modules/shared/services/eventBus'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'

const { FF_AUTOMATE_MODULE_ENABLED } = getFeatureFlags()

Expand Down
2 changes: 1 addition & 1 deletion packages/server/modules/automate/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import {
storeTokenScopesFactory,
storeUserServerAppTokenFactory
} from '@/modules/core/repositories/tokens'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
ProjectAutomationsUpdatedMessageType,
ProjectTriggeredAutomationsStatusUpdatedMessageType
Expand Down
2 changes: 1 addition & 1 deletion packages/server/modules/automate/rest/logStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { ExecutionEngineFailedResponseError } from '@/modules/automate/errors/ex
import { getAutomationRunWithTokenFactory } from '@/modules/automate/repositories/automations'
import { corsMiddleware } from '@/modules/core/configs/cors'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
validateRequiredStreamFactory,
validateResourceAccess,
Expand Down
59 changes: 59 additions & 0 deletions packages/server/modules/blobstorage/clients/objectStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import {
getS3AccessKey,
getS3BucketName,
getS3Endpoint,
getS3Region,
getS3SecretKey
} from '@/modules/shared/helpers/envHelper'
import { S3Client, S3ClientConfig } from '@aws-sdk/client-s3'
import { Optional } from '@speckle/shared'

export type ObjectStorage = {
client: S3Client
bucket: string
}

export type GetObjectStorageParams = {
credentials: S3ClientConfig['credentials']
endpoint: S3ClientConfig['endpoint']
region: S3ClientConfig['region']
bucket: string
}

/**
* Get object storage client
*/
export const getObjectStorage = (params: GetObjectStorageParams): ObjectStorage => {
const { bucket, credentials, endpoint, region } = params

const config: S3ClientConfig = {
credentials,
endpoint,
region,
forcePathStyle: true
}
const client = new S3Client(config)
return { client, bucket }
}

let mainObjectStorage: Optional<ObjectStorage> = undefined

/**
* Get main object storage client
*/
export const getMainObjectStorage = (): ObjectStorage => {
if (mainObjectStorage) return mainObjectStorage

const mainParams: GetObjectStorageParams = {
credentials: {
accessKeyId: getS3AccessKey(),
secretAccessKey: getS3SecretKey()
},
endpoint: getS3Endpoint(),
region: getS3Region(),
bucket: getS3BucketName()
}

mainObjectStorage = getObjectStorage(mainParams)
return mainObjectStorage
}
12 changes: 4 additions & 8 deletions packages/server/modules/blobstorage/domain/operations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
} from '@/modules/blobstorage/domain/types'
import { MaybeNullOrUndefined, Nullable } from '@speckle/shared'
import type { Readable } from 'stream'
import { StoreFileStream } from '@/modules/blobstorage/domain/storageOperations'

export type GetBlobs = (params: {
streamId?: MaybeNullOrUndefined<string>
Expand Down Expand Up @@ -33,21 +34,16 @@ export type GetBlobMetadataCollection = (params: {
}) => Promise<{ blobs: BlobStorageItem[]; cursor: Nullable<string> }>

export type UploadFileStream = (
params1: {
streamData: {
streamId: string
userId: string | undefined
},
params2: {
blobData: {
blobId: string
fileName: string
fileType: string | undefined
fileStream: Readable | Buffer
}
) => Promise<{ blobId: string; fileName: string; fileHash: string }>

type FileStream = string | Blob | Readable | Uint8Array | Buffer

export type StoreFileStream = (args: {
objectKey: string
fileStream: FileStream
}) => Promise<{ fileHash: string }>
export { StoreFileStream }
23 changes: 23 additions & 0 deletions packages/server/modules/blobstorage/domain/storageOperations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type stream from 'stream'
import type { Readable } from 'stream'

export type GetObjectStream = (params: {
objectKey: string
}) => Promise<stream.Readable>

export type GetObjectAttributes = (params: { objectKey: string }) => Promise<{
fileSize: number
}>

type FileStream = string | Blob | Readable | Uint8Array | Buffer

export type StoreFileStream = (args: {
objectKey: string
fileStream: FileStream
}) => Promise<{ fileHash: string }>

export type DeleteObject = (params: { objectKey: string }) => Promise<void>

export type EnsureStorageAccess = (params: {
createBucketIfNotExists: boolean
}) => Promise<void>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
StreamBlobsArgs
} from '@/modules/core/graph/generated/graphql'
import { StreamGraphQLReturn } from '@/modules/core/helpers/graphTypes'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
BadRequestError,
NotFoundError,
Expand Down
57 changes: 44 additions & 13 deletions packages/server/modules/blobstorage/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,6 @@ import {
streamWritePermissionsPipelineFactory,
streamReadPermissionsPipelineFactory
} from '@/modules/shared/authz'
import {
ensureStorageAccess,
storeFileStream,
getObjectStream,
deleteObject,
getObjectAttributes
} from '@/modules/blobstorage/objectStorage'
import crs from 'crypto-random-string'
import { authMiddlewareCreator } from '@/modules/shared/middleware'
import { isArray } from 'lodash'
Expand Down Expand Up @@ -42,20 +35,36 @@ import {
fullyDeleteBlobFactory
} from '@/modules/blobstorage/services/management'
import { getRolesFactory } from '@/modules/shared/repositories/roles'
import { adminOverrideEnabled } from '@/modules/shared/helpers/envHelper'
import {
adminOverrideEnabled,
createS3Bucket
} from '@/modules/shared/helpers/envHelper'
import { getStreamFactory } from '@/modules/core/repositories/streams'
import { Request, Response } from 'express'
import { ensureError } from '@speckle/shared'
import { SpeckleModule } from '@/modules/shared/helpers/typeHelper'
import { getProjectDbClient } from '@/modules/multiregion/dbSelector'
import { getProjectDbClient } from '@/modules/multiregion/utils/dbSelector'
import {
deleteObjectFactory,
ensureStorageAccessFactory,
getObjectAttributesFactory,
getObjectStreamFactory,
storeFileStreamFactory
} from '@/modules/blobstorage/repositories/blobs'
import { getMainObjectStorage } from '@/modules/blobstorage/clients/objectStorage'
import { getProjectObjectStorage } from '@/modules/multiregion/utils/blobStorageSelector'

const ensureConditions = async () => {
if (process.env.DISABLE_FILE_UPLOADS) {
moduleLogger.info('📦 Blob storage is DISABLED')
return
} else {
moduleLogger.info('📦 Init BlobStorage module')
await ensureStorageAccess()
const storage = getMainObjectStorage()
const ensureStorageAccess = ensureStorageAccessFactory({ storage })
await ensureStorageAccess({
createBucketIfNotExists: createS3Bucket()
})
}

if (!process.env.S3_BUCKET) {
Expand Down Expand Up @@ -125,8 +134,12 @@ export const init: SpeckleModule['init'] = async (app) => {
limits: { fileSize: getFileSizeLimit() }
})

const projectDb = await getProjectDbClient({ projectId: streamId })
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const storeFileStream = storeFileStreamFactory({ storage: projectStorage })
const updateBlob = updateBlobFactory({ db: projectDb })
const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })

Expand All @@ -146,6 +159,11 @@ export const init: SpeckleModule['init'] = async (app) => {
updateBlob
})

const getObjectAttributes = getObjectAttributesFactory({
storage: projectStorage
})
const deleteObject = deleteObjectFactory({ storage: projectStorage })

busboy.on('file', (formKey, file, info) => {
const { filename: fileName } = info
const fileType = fileName?.split('.')?.pop()?.toLowerCase()
Expand Down Expand Up @@ -275,9 +293,15 @@ export const init: SpeckleModule['init'] = async (app) => {
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const streamId = req.params.streamId
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const getFileStream = getFileStreamFactory({ getBlobMetadata })
const getObjectStream = getObjectStreamFactory({ storage: projectStorage })

const { fileName } = await getBlobMetadata({
streamId: req.params.streamId,
Expand All @@ -304,12 +328,19 @@ export const init: SpeckleModule['init'] = async (app) => {
},
async (req, res) => {
errorHandler(req, res, async (req, res) => {
const projectDb = await getProjectDbClient({ projectId: req.params.streamId })
const streamId = req.params.streamId
const [projectDb, projectStorage] = await Promise.all([
getProjectDbClient({ projectId: streamId }),
getProjectObjectStorage({ projectId: streamId })
])

const getBlobMetadata = getBlobMetadataFactory({ db: projectDb })
const deleteBlob = fullyDeleteBlobFactory({
getBlobMetadata,
deleteBlob: deleteBlobFactory({ db: projectDb })
})
const deleteObject = deleteObjectFactory({ storage: projectStorage })

await deleteBlob({
streamId: req.params.streamId,
blobId: req.params.blobId,
Expand Down
Loading

0 comments on commit 8d0cbad

Please sign in to comment.