Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] External Id hydration #1189

Merged
merged 7 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions __test__/unit/user/login.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,13 @@ jest.mock('../../../src/shared/libraries/Log');
describe('Login tests', () => {
beforeEach(() => {
jest.useFakeTimers();
test.stub(
PropertiesExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(
IdentityExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(
SubscriptionExecutor.prototype,
'getOperationsFromCache',
Promise.resolve([]),
);
test.stub(PropertiesExecutor.prototype, 'getOperationsFromCache', []);
test.stub(IdentityExecutor.prototype, 'getOperationsFromCache', []);
test.stub(SubscriptionExecutor.prototype, 'getOperationsFromCache', []);
});

afterEach(() => {
jest.runOnlyPendingTimers();
jest.resetAllMocks();
});

Expand Down
8 changes: 7 additions & 1 deletion src/core/CoreModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import { OSModelStoreFactory } from './modelRepo/OSModelStoreFactory';
import Log from '../shared/libraries/Log';
import { logMethodCall } from '../shared/utils/utils';
import { SupportedModel } from './models/SupportedModels';
import { NewRecordsState } from '../shared/models/NewRecordsState';

export default class CoreModule {
public modelRepo?: ModelRepo;
public operationRepo?: OperationRepo;
public initPromise: Promise<void>;
public newRecordsState?: NewRecordsState;

private modelCache: ModelCache;
private initResolver: () => void = () => null;
Expand All @@ -25,7 +27,11 @@ export default class CoreModule {
.then((allCachedOSModels) => {
const modelStores = OSModelStoreFactory.build(allCachedOSModels);
this.modelRepo = new ModelRepo(this.modelCache, modelStores);
this.operationRepo = new OperationRepo(this.modelRepo);
this.newRecordsState = new NewRecordsState();
this.operationRepo = new OperationRepo(
this.modelRepo,
this.newRecordsState,
);
this.initResolver();
})
.catch((e) => {
Expand Down
13 changes: 9 additions & 4 deletions src/core/CoreModuleDirector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ export class CoreModuleDirector {
await this.core.resetModelRepoAndCache();
}

public hydrateUser(user: UserData): void {
logMethodCall('CoreModuleDirector.hydrateUser', { user });
public hydrateUser(user: UserData, externalId?: string): void {
logMethodCall('CoreModuleDirector.hydrateUser', { user, externalId });
try {
const identity = this.getIdentityModel();
const properties = this.getPropertiesModel();

const { onesignal_id: onesignalId, external_id: externalId } =
user.identity;
const { onesignal_id: onesignalId } = user.identity;

if (!onesignalId) {
throw new OneSignalError('OneSignal ID is missing from user data');
Expand All @@ -83,6 +82,7 @@ export class CoreModuleDirector {
if (externalId) {
identity?.setExternalId(externalId);
properties?.setExternalId(externalId);
user.identity.external_id = externalId;
}

// identity and properties models are always single, so we hydrate immediately (i.e. replace existing data)
Expand All @@ -109,6 +109,8 @@ export class CoreModuleDirector {
): void {
logMethodCall('CoreModuleDirector._hydrateSubscriptions', {
subscriptions,
onesignalId,
externalId,
});

if (!subscriptions) {
Expand Down Expand Up @@ -175,6 +177,9 @@ export class CoreModuleDirector {
}

/* G E T T E R S */
public getNewRecordsState(): NewRecordsState | undefined {
return this.core.newRecordsState;
}

public getModelByTypeAndId(
modelName: ModelName,
Expand Down
25 changes: 23 additions & 2 deletions src/core/executors/ExecutorBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import Log from '../../shared/libraries/Log';
import Database from '../../shared/services/Database';
import LocalStorage from '../../shared/utils/LocalStorage';
import { NewRecordsState } from '../../shared/models/NewRecordsState';

const RETRY_AFTER = 5_000;

export default abstract class ExecutorBase {
protected _deltaQueue: CoreDelta<SupportedModel>[] = [];
protected _operationQueue: Operation<SupportedModel>[] = [];
protected _newRecordsState: NewRecordsState;

protected _executeAdd?: (
operation: Operation<SupportedModel>,
Expand All @@ -31,7 +33,10 @@
static OPERATIONS_BATCH_PROCESSING_TIME = 5;
static RETRY_COUNT = 5;

constructor(executorConfig: ExecutorConfig<SupportedModel>) {
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
setInterval(() => {
Log.debug('OneSignal: checking for operations to process from cache');
const cachedOperations = this.getOperationsFromCache();
Expand All @@ -48,6 +53,8 @@
this._executeAdd = executorConfig.add;
this._executeUpdate = executorConfig.update;
this._executeRemove = executorConfig.remove;

this._newRecordsState = newRecordsState;
}

abstract processDeltaQueue(): void;
Expand Down Expand Up @@ -86,7 +93,7 @@
this._operationQueue = [];
}

protected _getChangeType(oldValue: any, newValue: any): CoreChangeType {

Check warning on line 96 in src/core/executors/ExecutorBase.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected any. Specify a different type

Check warning on line 96 in src/core/executors/ExecutorBase.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected any. Specify a different type
logMethodCall('ExecutorBase._getChangeType', { oldValue, newValue });
const wasPropertyAdded = !oldValue && !!newValue;
const wasPropertyRemoved = !!oldValue && !newValue;
Expand Down Expand Up @@ -124,7 +131,7 @@
if (operation) {
OperationCache.enqueue(operation);

if (this.onlineStatus) {
if (this._canExecute(operation)) {
this._processOperation(operation, ExecutorBase.RETRY_COUNT).catch(
(err) => {
Log.error(err);
Expand Down Expand Up @@ -187,4 +194,18 @@
this._processOperationQueue.call(this);
}
}

private _canExecute(operation: Operation<SupportedModel>): boolean {
if (!this.onlineStatus) {
return false;
}

if (operation.applyToRecordId) {
if (!this._newRecordsState.canAccess(operation.applyToRecordId)) {
return false;
}
}

return true;
}
}
12 changes: 8 additions & 4 deletions src/core/executors/ExecutorFactory.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import { Executor } from '../models/Executor';
import { ExecutorConfig } from '../models/ExecutorConfig';
import { ModelName, SupportedModel } from '../models/SupportedModels';
Expand All @@ -6,14 +7,17 @@ import { PropertiesExecutor } from './PropertiesExecutor';
import { SubscriptionExecutor } from './SubscriptionExecutor';

export class ExecutorFactory {
static build(executorConfig: ExecutorConfig<SupportedModel>): Executor {
static build(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
): Executor {
switch (executorConfig.modelName) {
case ModelName.Identity:
return new IdentityExecutor(executorConfig);
return new IdentityExecutor(executorConfig, newRecordsState);
case ModelName.Properties:
return new PropertiesExecutor(executorConfig);
return new PropertiesExecutor(executorConfig, newRecordsState);
case ModelName.Subscriptions:
return new SubscriptionExecutor(executorConfig);
return new SubscriptionExecutor(executorConfig, newRecordsState);
}
}
}
5 changes: 3 additions & 2 deletions src/core/executors/ExecutorStore.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import { ModelName } from '../models/SupportedModels';
import OSExecutor from './ExecutorBase';
import { EXECUTOR_CONFIG_MAP } from './ExecutorConfigMap';
Expand All @@ -10,10 +11,10 @@ type ExecutorStoreInterface = {
export class ExecutorStore {
store: ExecutorStoreInterface = {};

constructor() {
constructor(newRecordsState: NewRecordsState) {
Object.values(ModelName).forEach((modelName) => {
const config = EXECUTOR_CONFIG_MAP[modelName as ModelName];
this.store[modelName] = ExecutorFactory.build(config);
this.store[modelName] = ExecutorFactory.build(config, newRecordsState);
});
}

Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/IdentityExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import OperationCache from '../caching/OperationCache';
import { CoreChangeType } from '../models/CoreChangeType';
import { PropertyDelta } from '../models/CoreDeltas';
Expand All @@ -8,8 +9,11 @@ import { isPropertyDelta } from '../utils/typePredicates';
import ExecutorBase from './ExecutorBase';

export class IdentityExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/PropertiesExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import ExecutorBase from './ExecutorBase';
import { Operation } from '../operationRepo/Operation';
import { CoreChangeType } from '../models/CoreChangeType';
Expand All @@ -6,8 +7,11 @@ import { ModelName, SupportedModel } from '../models/SupportedModels';
import OperationCache from '../caching/OperationCache';

export class PropertiesExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
8 changes: 6 additions & 2 deletions src/core/executors/SubscriptionExecutor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { NewRecordsState } from '../../shared/models/NewRecordsState';
import OperationCache from '../caching/OperationCache';
import { CoreChangeType } from '../models/CoreChangeType';
import { CoreDelta } from '../models/CoreDeltas';
Expand All @@ -7,8 +8,11 @@ import { Operation } from '../operationRepo/Operation';
import ExecutorBase from './ExecutorBase';

export class SubscriptionExecutor extends ExecutorBase {
constructor(executorConfig: ExecutorConfig<SupportedModel>) {
super(executorConfig);
constructor(
executorConfig: ExecutorConfig<SupportedModel>,
newRecordsState: NewRecordsState,
) {
super(executorConfig, newRecordsState);
}

processDeltaQueue(): void {
Expand Down
3 changes: 3 additions & 0 deletions src/core/modelRepo/ModelRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
this.broadcast({
model: payload,
changeType: CoreChangeType.Add,
applyToRecordId: payload?.applyToRecordId,
});
}

Expand All @@ -77,6 +78,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
this.broadcast({
model: payload,
changeType: CoreChangeType.Remove,
applyToRecordId: payload?.applyToRecordId,
});
}

Expand All @@ -103,6 +105,7 @@ export class ModelRepo extends Subscribable<CoreDelta<SupportedModel>> {
property: payload.property,
oldValue: payload.oldValue,
newValue: payload.newValue,
applyToRecordId: payload.model?.applyToRecordId,
};
this.broadcast(delta);
}
Expand Down
7 changes: 6 additions & 1 deletion src/core/modelRepo/OSModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
export class OSModel<Model> extends Subscribable<ModelStoreChange<Model>> {
data: Model;
modelId: string;

onesignalId?: string;
awaitOneSignalIdAvailable: Promise<string>;
onesignalIdAvailableCallback?: (onesignalId: string) => void;
externalId?: string;
applyToRecordId?: string;

constructor(
readonly modelName: ModelName,
Expand Down Expand Up @@ -55,13 +55,18 @@
this.externalId = externalId;
}

public setApplyToRecordId(applyToRecordId: string): void {
logMethodCall('setapplyToRecordId', { applyToRecordId });
this.applyToRecordId = applyToRecordId;
}

/**
* We use this method to update the model data.
* Results in a broadcasted update event.
*/
public set(
property: StringKeys<Model>,
newValue: any,

Check warning on line 69 in src/core/modelRepo/OSModel.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected any. Specify a different type
propagate = true,
): void {
logMethodCall('set', { property, newValue });
Expand Down
1 change: 1 addition & 0 deletions src/core/models/CoreDeltas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
export type ModelDelta<Model> = {
model: OSModel<Model>;
changeType: CoreChangeType;
applyToRecordId?: string;
};

export interface PropertyDelta<Model> extends ModelDelta<Model> {
property: StringKeys<Model>;
oldValue?: any;

Check warning on line 13 in src/core/models/CoreDeltas.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected any. Specify a different type
newValue: any;

Check warning on line 14 in src/core/models/CoreDeltas.ts

View workflow job for this annotation

GitHub Actions / test

Unexpected any. Specify a different type
}

export type CoreDelta<Model> = ModelDelta<Model> | PropertyDelta<Model>;
2 changes: 2 additions & 0 deletions src/core/operationRepo/Operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export class Operation<Model> {
timestamp: number;
payload?: Partial<SupportedModel>;
model?: OSModel<Model>;
applyToRecordId?: string;
jwtTokenAvailable: Promise<void>;
jwtToken?: string | null;

Expand All @@ -22,6 +23,7 @@ export class Operation<Model> {
this.operationId = Math.random().toString(36).substring(2);
this.payload = deltas ? this.getPayload(deltas) : undefined;
this.model = deltas ? deltas[deltas.length - 1].model : undefined;
this.applyToRecordId = deltas?.[deltas.length - 1]?.applyToRecordId;
this.timestamp = Date.now();
// eslint-disable-next-line no-async-promise-executor
this.jwtTokenAvailable = new Promise<void>(async (resolve) => {
Expand Down
10 changes: 8 additions & 2 deletions src/core/operationRepo/OperationRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ import { ExecutorStore } from '../executors/ExecutorStore';
import { CoreDelta } from '../models/CoreDeltas';
import { SupportedModel } from '../models/SupportedModels';
import { logMethodCall } from '../../shared/utils/utils';
import { NewRecordsState } from '../../shared/models/NewRecordsState';

export class OperationRepo {
public executorStore: ExecutorStore;
public newRecordsState: NewRecordsState;
private _unsubscribeFromModelRepo: () => void;
private _deltaQueue: CoreDelta<SupportedModel>[] = [];
static DELTAS_BATCH_PROCESSING_TIME = 1;

constructor(private modelRepo: ModelRepo) {
this.executorStore = new ExecutorStore();
constructor(
private modelRepo: ModelRepo,
newRecordsState: NewRecordsState,
) {
this.newRecordsState = newRecordsState;
this.executorStore = new ExecutorStore(this.newRecordsState);

this._unsubscribeFromModelRepo = this.modelRepo.subscribe(
(delta: CoreDelta<SupportedModel>) => {
Expand Down
9 changes: 8 additions & 1 deletion src/core/requestService/SubscriptionRequests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ export default class SubscriptionRequests {
logMethodCall('SubscriptionRequests.addSubscription', operation);

const appId = await MainHelper.getAppId();
const { subscription, aliasPair } = processSubscriptionOperation(operation);
const { subscription, aliasPair, subscriptionId } =
processSubscriptionOperation(operation);

const response = await RequestService.createSubscription(
{ appId },
aliasPair,
{ subscription },
);

const status = response.status;
if (status >= 200 && status < 300) {
OneSignal.coreDirector.getNewRecordsState().add(subscriptionId);
}

return SubscriptionRequests._processSubscriptionResponse(response);
}

Expand Down
Loading
Loading