Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update exporter #4327

Merged
merged 8 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions packages/api/src/jobs/export.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import axios from 'axios'
import jwt from 'jsonwebtoken'
import { env } from '../env'
import { findActiveUser } from '../services/user'
import { logger } from '../utils/logger'

export interface ExportJobData {
userId: string
}

export const EXPORT_JOB_NAME = 'export'

export const exportJob = async (jobData: ExportJobData) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jacksonh another question is if we should just use async jobs to export the data instead of calling the cloud run service

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sywhb maybe we should try to start with an async job and get a baseline of performance in demo. Both our accounts are quite large there so it would give an idea.

const { userId } = jobData
const user = await findActiveUser(userId)
if (!user) {
logger.error('user not found', {
userId,
})
return
}

logger.info('exporting all items...', {
userId,
})

const token = jwt.sign(
{
uid: userId,
},
env.server.jwtSecret,
{ expiresIn: '1d' }
)

await axios.post(env.queue.exportTaskHandlerUrl, undefined, {
headers: {
OmnivoreAuthorizationHeader: token,
},
})
}
58 changes: 58 additions & 0 deletions packages/api/src/routers/export_router.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import cors from 'cors'
import express, { Router } from 'express'
import { getClaimsByToken, getTokenByRequest } from '../utils/auth'
import { corsConfig } from '../utils/corsConfig'
import { queueExportJob } from '../utils/createTask'
import { logger } from '../utils/logger'

export function exportRouter() {
const router = Router()

// eslint-disable-next-line @typescript-eslint/no-misused-promises
router.get('/', cors<express.Request>(corsConfig), async (req, res) => {
sywhb marked this conversation as resolved.
Show resolved Hide resolved
const token = getTokenByRequest(req)
// get claims from token
const claims = await getClaimsByToken(token)
if (!claims) {
logger.error('Token not found')
return res.status(401).send({
error: 'UNAUTHORIZED',
})
}

// get user by uid from claims
const userId = claims.uid

try {
const job = await queueExportJob(userId)

if (!job) {
logger.error('Failed to queue export job', {
userId,
})
return res.status(500).send({
error: 'INTERNAL_ERROR',
})
}

logger.info('Export job queued', {
userId,
jobId: job.id,
})

res.send({
jobId: job.id,
})
} catch (error) {
logger.error('Error exporting all items', {
userId,
error,
})
return res.status(500).send({
error: 'INTERNAL_ERROR',
})
}
})

return router
}
3 changes: 3 additions & 0 deletions packages/api/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export interface BackendEnv {
integrationExporterUrl: string
integrationImporterUrl: string
importerMetricsUrl: string
exportTaskHandlerUrl: string
}
fileUpload: {
gcsUploadBucket: string
Expand Down Expand Up @@ -199,6 +200,7 @@ const nullableEnvVars = [
'INTERCOM_WEB_SECRET',
'INTERCOM_IOS_SECRET',
'INTERCOM_ANDROID_SECRET',
'EXPORT_TASK_HANDLER_URL',
] // Allow some vars to be null/empty

const envParser =
Expand Down Expand Up @@ -300,6 +302,7 @@ export function getEnv(): BackendEnv {
integrationExporterUrl: parse('INTEGRATION_EXPORTER_URL'),
integrationImporterUrl: parse('INTEGRATION_IMPORTER_URL'),
importerMetricsUrl: parse('IMPORTER_METRICS_COLLECTOR_URL'),
exportTaskHandlerUrl: parse('EXPORT_TASK_HANDLER_URL'),
}
const imageProxy = {
url: parse('IMAGE_PROXY_URL'),
Expand Down
23 changes: 21 additions & 2 deletions packages/api/src/utils/createTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { BulkActionData, BULK_ACTION_JOB_NAME } from '../jobs/bulk_action'
import { CallWebhookJobData, CALL_WEBHOOK_JOB_NAME } from '../jobs/call_webhook'
import { SendEmailJobData, SEND_EMAIL_JOB } from '../jobs/email/send_email'
import { EXPIRE_FOLDERS_JOB_NAME } from '../jobs/expire_folders'
import { EXPORT_JOB_NAME } from '../jobs/export'
import { THUMBNAIL_JOB } from '../jobs/find_thumbnail'
import { GENERATE_PREVIEW_CONTENT_JOB } from '../jobs/generate_preview_content'
import { EXPORT_ALL_ITEMS_JOB_NAME } from '../jobs/integration/export_all_items'
Expand Down Expand Up @@ -113,14 +114,13 @@ export const getJobPriority = (jobName: string): number => {
case THUMBNAIL_JOB:
return 10
case `${REFRESH_FEED_JOB_NAME}_low`:
case EXPORT_ITEM_JOB_NAME:
case CREATE_DIGEST_JOB:
return 50
case EXPORT_ALL_ITEMS_JOB_NAME:
case REFRESH_ALL_FEEDS_JOB_NAME:
case GENERATE_PREVIEW_CONTENT_JOB:
case PRUNE_TRASH_JOB:
case EXPIRE_FOLDERS_JOB_NAME:
case EXPORT_JOB_NAME:
return 100

default:
Expand Down Expand Up @@ -1073,4 +1073,23 @@ export const enqueueExpireFoldersJob = async () => {
)
}

export const queueExportJob = async (userId: string) => {
const queue = await getQueue()
if (!queue) {
return undefined
}

return queue.add(
EXPORT_JOB_NAME,
{ userId },
{
jobId: `${EXPORT_JOB_NAME}_${userId}_${JOB_VERSION}`,
removeOnComplete: true,
removeOnFail: true,
Comment on lines +1086 to +1088
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The export jobs are deduplicated but we probably also want to limit number of exports per user per day by checking the number of zip files in cloud storage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah agreed, or even create an entry in postgres, as annoying as that can be

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, makes sense to me

priority: getJobPriority(EXPORT_JOB_NAME),
attempts: 1,
}
)
}

export default createHttpTaskWithToken
2 changes: 2 additions & 0 deletions packages/export-handler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"dev": "concurrently \"tsc -w\" \"nodemon --watch ./build/ --exec npm run start\""
},
"devDependencies": {
"@types/archiver": "^6.0.2",
"@types/chai": "^4.3.4",
"@types/mocha": "^10.0.1",
"eslint-plugin-prettier": "^4.0.0"
Expand All @@ -28,6 +29,7 @@
"@omnivore-app/api": "^1.0.4",
"@omnivore/utils": "1.0.0",
"@sentry/serverless": "^7.77.0",
"archiver": "^7.0.1",
"csv-stringify": "^6.4.0",
"dotenv": "^16.0.1",
"jsonwebtoken": "^8.5.1",
Expand Down
Loading
Loading