Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): use @nestjs-cls/transactional to impl database transaction #9759

Draft
wants to merge 1 commit into
base: 01-17-feat_server_enable_cls_plugin_to_store_request_id
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/backend/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
"@google-cloud/opentelemetry-cloud-monitoring-exporter": "^0.20.0",
"@google-cloud/opentelemetry-cloud-trace-exporter": "^2.4.1",
"@google-cloud/opentelemetry-resource-util": "^2.4.0",
"@nestjs-cls/transactional": "^2.4.4",
"@nestjs-cls/transactional-adapter-prisma": "^1.2.7",
"@nestjs/apollo": "^12.2.2",
"@nestjs/common": "^10.4.15",
"@nestjs/core": "^10.4.15",
Expand Down
11 changes: 11 additions & 0 deletions packages/backend/server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import {
Module,
} from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { ClsPluginTransactional } from '@nestjs-cls/transactional';
import { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
import { PrismaClient } from '@prisma/client';
import type { Request } from 'express';
import { get } from 'lodash-es';
import { ClsModule } from 'nestjs-cls';
Expand Down Expand Up @@ -55,6 +58,14 @@ export const FunctionalityModules = [
return (req.headers['x-request-id'] as string) ?? randomUUID();
},
},
plugins: [
// https://papooch.github.io/nestjs-cls/plugins/available-plugins/transactional/prisma-adapter
new ClsPluginTransactional({
adapter: new TransactionalAdapterPrisma({
prismaInjectionToken: PrismaClient,
}),
}),
],
}),
ConfigModule.forRoot(),
RuntimeModule,
Expand Down
9 changes: 9 additions & 0 deletions packages/backend/server/src/models/base.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Inject, Logger } from '@nestjs/common';
import { TransactionHost } from '@nestjs-cls/transactional';
import type { TransactionalAdapterPrisma } from '@nestjs-cls/transactional-adapter-prisma';
import { PrismaClient } from '@prisma/client';

import { Config } from '../base';
Expand All @@ -16,4 +18,11 @@ export class BaseModel {

@Inject(PrismaClient)
protected readonly db!: PrismaClient;

@Inject(TransactionHost)
private readonly txHost!: TransactionHost<TransactionalAdapterPrisma>;

protected get tx() {
return this.txHost.tx;
}
}
51 changes: 24 additions & 27 deletions packages/backend/server/src/models/feature.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import { Feature } from '@prisma/client';
import { z } from 'zod';

import { PrismaTransaction } from '../base';
import { BaseModel } from './base';
import { Features, FeatureType } from './common';

Expand All @@ -20,7 +20,7 @@ type FeatureConfigs<T extends FeatureNames> = z.infer<
@Injectable()
export class FeatureModel extends BaseModel {
async get<T extends FeatureNames>(name: T) {
const feature = await this.getLatest(this.db, name);
const feature = await this.getLatest(name);

// All features are hardcoded in the codebase
// It would be a fatal error if the feature is not found in DB.
Expand All @@ -43,6 +43,7 @@ export class FeatureModel extends BaseModel {
};
}

@Transactional()
async upsert<T extends FeatureNames>(name: T, configs: FeatureConfigs<T>) {
const shape = this.getConfigShape(name);
const parseResult = shape.safeParse(configs);
Expand All @@ -58,37 +59,33 @@ export class FeatureModel extends BaseModel {
// TODO(@forehalo):
// could be a simple upsert operation, but we got useless `version` column in the database
// will be fixed when `version` column gets deprecated
const feature = await this.db.$transaction(async tx => {
const latest = await this.getLatest(tx, name);

if (!latest) {
return await tx.feature.create({
data: {
type: FeatureType.Feature,
feature: name,
configs: parsedConfigs,
},
});
} else {
return await tx.feature.update({
where: { id: latest.id },
data: {
configs: parsedConfigs,
},
});
}
});
const latest = await this.getLatest(name);

let feature: Feature;
if (!latest) {
feature = await this.tx.feature.create({
data: {
type: FeatureType.Feature,
feature: name,
configs: parsedConfigs,
},
});
} else {
feature = await this.tx.feature.update({
where: { id: latest.id },
data: {
configs: parsedConfigs,
},
});
}

this.logger.verbose(`Feature ${name} upserted`);

return feature as Feature & { configs: FeatureConfigs<T> };
}

private async getLatest<T extends FeatureNames>(
client: PrismaTransaction,
name: T
) {
return client.feature.findFirst({
private async getLatest<T extends FeatureNames>(name: T) {
return this.tx.feature.findFirst({
where: { feature: name },
orderBy: { version: 'desc' },
});
Expand Down
80 changes: 40 additions & 40 deletions packages/backend/server/src/models/page.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import {
type WorkspacePage as Page,
type WorkspacePageUserPermission as PageUserPermission,
Expand Down Expand Up @@ -87,6 +88,7 @@ export class PageModel extends BaseModel {
/**
* Grant the page member with the given permission.
*/
@Transactional()
async grantMember(
workspaceId: string,
pageId: string,
Expand All @@ -105,50 +107,48 @@ export class PageModel extends BaseModel {

// If the user is already accepted and the new permission is owner, we need to revoke old owner
if (!data || data.type !== permission) {
return await this.db.$transaction(async tx => {
if (data) {
// Update the permission
data = await tx.workspacePageUserPermission.update({
where: {
workspaceId_pageId_userId: {
workspaceId,
pageId,
userId,
},
},
data: { type: permission },
});
} else {
// Create a new permission
data = await tx.workspacePageUserPermission.create({
data: {
if (data) {
// Update the permission
data = await this.tx.workspacePageUserPermission.update({
where: {
workspaceId_pageId_userId: {
workspaceId,
pageId,
userId,
type: permission,
// page permission does not require invitee to accept, the accepted field will be deprecated later.
accepted: true,
},
});
}

// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await tx.workspacePageUserPermission.updateMany({
where: {
workspaceId,
pageId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} page ${pageId} to user ${userId}`
);
}
return data;
});
},
data: { type: permission },
});
} else {
// Create a new permission
data = await this.tx.workspacePageUserPermission.create({
data: {
workspaceId,
pageId,
userId,
type: permission,
// page permission does not require invitee to accept, the accepted field will be deprecated later.
accepted: true,
},
});
}

// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await this.tx.workspacePageUserPermission.updateMany({
where: {
workspaceId,
pageId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} page ${pageId} to user ${userId}`
);
}
return data;
}

// nothing to do
Expand Down
105 changes: 52 additions & 53 deletions packages/backend/server/src/models/workspace.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Transactional } from '@nestjs-cls/transactional';
import {
type Workspace,
WorkspaceMemberStatus,
Expand Down Expand Up @@ -125,6 +126,7 @@ export class WorkspaceModel extends BaseModel {
/**
* Grant the workspace member with the given permission and status.
*/
@Transactional()
async grantMember(
workspaceId: string,
userId: string,
Expand Down Expand Up @@ -157,29 +159,27 @@ export class WorkspaceModel extends BaseModel {

// If the user is already accepted and the new permission is owner, we need to revoke old owner
if (data.status === WorkspaceMemberStatus.Accepted || data.accepted) {
return await this.db.$transaction(async tx => {
const updated = await tx.workspaceUserPermission.update({
const updated = await this.tx.workspaceUserPermission.update({
where: {
workspaceId_userId: { workspaceId, userId },
},
data: { type: permission },
});
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await this.tx.workspaceUserPermission.updateMany({
where: {
workspaceId_userId: { workspaceId, userId },
workspaceId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: permission },
data: { type: Permission.Admin },
});
// If the new permission is owner, we need to revoke old owner
if (permission === Permission.Owner) {
await tx.workspaceUserPermission.updateMany({
where: {
workspaceId,
type: Permission.Owner,
userId: { not: userId },
},
data: { type: Permission.Admin },
});
this.logger.log(
`Change owner of workspace ${workspaceId} to ${userId}`
);
}
return updated;
});
this.logger.log(
`Change owner of workspace ${workspaceId} to ${userId}`
);
}
return updated;
}

// If the user is not accepted, we can update the status directly
Expand Down Expand Up @@ -407,50 +407,49 @@ export class WorkspaceModel extends BaseModel {
/**
* Refresh the workspace member seat status.
*/
@Transactional()
async refreshMemberSeatStatus(workspaceId: string, memberLimit: number) {
const usedCount = await this.getMemberUsedCount(workspaceId);
const availableCount = memberLimit - usedCount;
if (availableCount <= 0) {
return;
}

return await this.db.$transaction(async tx => {
const members = await tx.workspaceUserPermission.findMany({
select: { id: true, status: true },
where: {
workspaceId,
status: {
in: [
WorkspaceMemberStatus.NeedMoreSeat,
WorkspaceMemberStatus.NeedMoreSeatAndReview,
],
},
const members = await this.tx.workspaceUserPermission.findMany({
select: { id: true, status: true },
where: {
workspaceId,
status: {
in: [
WorkspaceMemberStatus.NeedMoreSeat,
WorkspaceMemberStatus.NeedMoreSeatAndReview,
],
},
// find the oldest members first
orderBy: { createdAt: 'asc' },
});
},
// find the oldest members first
orderBy: { createdAt: 'asc' },
});

const needChange = members.slice(0, availableCount);
const groups = groupBy(needChange, m => m.status);
const needChange = members.slice(0, availableCount);
const groups = groupBy(needChange, m => m.status);

const toPendings = groups.NeedMoreSeat;
if (toPendings) {
// NeedMoreSeat => Pending
await tx.workspaceUserPermission.updateMany({
where: { id: { in: toPendings.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.Pending },
});
}
const toPendings = groups.NeedMoreSeat;
if (toPendings) {
// NeedMoreSeat => Pending
await this.tx.workspaceUserPermission.updateMany({
where: { id: { in: toPendings.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.Pending },
});
}

const toUnderReviews = groups.NeedMoreSeatAndReview;
if (toUnderReviews) {
// NeedMoreSeatAndReview => UnderReview
await tx.workspaceUserPermission.updateMany({
where: { id: { in: toUnderReviews.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.UnderReview },
});
}
});
const toUnderReviews = groups.NeedMoreSeatAndReview;
if (toUnderReviews) {
// NeedMoreSeatAndReview => UnderReview
await this.tx.workspaceUserPermission.updateMany({
where: { id: { in: toUnderReviews.map(m => m.id) } },
data: { status: WorkspaceMemberStatus.UnderReview },
});
}
}

/**
Expand Down
Loading
Loading