diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java index 4638030dba7b..c169fed75972 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java @@ -69,6 +69,7 @@ import io.cdap.cdap.proto.id.KerberosPrincipalId; import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.proto.security.StandardPermission; +import io.cdap.cdap.proto.sourcecontrol.SortBy; import io.cdap.cdap.security.spi.authentication.AuthenticationContext; import io.cdap.cdap.security.spi.authorization.AccessEnforcer; import io.cdap.cdap.security.spi.authorization.UnauthorizedException; @@ -660,6 +661,34 @@ public void getApplicationDetails(FullHttpRequest request, HttpResponder respond responder.sendJson(HttpResponseStatus.OK, GSON.toJson(result)); } + /** + * Returns the source control metadata and sync status of all applications + * filter query format - "name=<name-filter> AND syncStatus=<SYNCED/UNSYNCED>". + */ + @GET + @Path("/sourcecontrol/apps") + public void getAllNamespaceSourceControlMetadata(FullHttpRequest request, HttpResponder responder, + @PathParam("namespace-id") String namespace, + @QueryParam("pageToken") String pageToken, + @QueryParam("pageSize") Integer pageSize, + @QueryParam("orderBy") SortOrder orderBy, + @QueryParam("orderByOption") SortBy orderByOption, + @QueryParam("filter") String filter + ) throws Exception { + // TODO(CDAP-20989): Implement the API handler + } + + /** + * Returns the source control metadata and sync status of a specific application. + */ + @GET + @Path("/apps/{app-id}/sourcecontrol") + public void getNamespaceSourceControlMetadata(HttpRequest request, HttpResponder responder, + @PathParam("namespace-id") final String namespaceId, + @PathParam("app-id") final String appName) throws Exception { + // TODO(CDAP-20989): Implement the API handler + } + /** * Decodes request coming from the {@link #getApplicationDetails(FullHttpRequest, HttpResponder, * String)} call. diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java index 5b93e1fb0c74..b216d045cfed 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/AppMetadataStore.java @@ -39,6 +39,7 @@ import io.cdap.cdap.common.ConflictException; import io.cdap.cdap.common.app.RunIds; import io.cdap.cdap.common.conf.Constants; +import io.cdap.cdap.common.lang.FunctionWithException; import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; import io.cdap.cdap.internal.app.runtime.ProgramOptionConstants; import io.cdap.cdap.internal.app.runtime.SystemArguments; @@ -101,23 +102,28 @@ /** * Store for application metadata. * - * This class is mostly responsible for reading and storing run records. Each program run will have + *

This class is mostly responsible for reading and storing run records. Each program run will + * have * several run records corresponding to state changes that occur during the program run. The rowkeys - * are of the form: + * are of the form:

* - * runRecordActive|namespace|app|version|programtype|program|inverted start time|runid - * runRecordCompleted|namespace|app|version|programtype|program|inverted start time|runid + * * - * The run count will have the row key of format: runRecordCount|namespace|app|version|programtype|program + *

These rows get deleted whenever state changes, with a new record written on top. In addition, + * workflow node state is stored as:

* - * These rows get deleted whenever state changes, with a new record written on top. In addition, - * workflow node state is stored as: + * * - * wns|namespace|app|version|programtype|program|runid|nodeid - * - * Workflow node state is updated whenever program state is updated and we notice that the program - * belongs to a workflow. + *

Workflow node state is updated whenever program state is updated and we notice that the + * program belongs to a workflow.

*/ + public class AppMetadataStore { public static final String WORKFLOW_RUNID = "workflowrunid"; @@ -381,6 +387,13 @@ private CloseableIterator getScanApplicationsIterator(StructuredT return table.scan(range, Integer.MAX_VALUE, sortOrder); } + /** + * Retrieves the total number of applications, excluding system applications, stored in the + * ApplicationSpecification table. + * + * @return The total number of applications, excluding system applications. + * @throws IOException If an error occurs while accessing the application metadata store. + */ public long getApplicationCount() throws IOException { // Get number of applications where namespace != SYSTEM (exclude system applications) Collection> fields = ImmutableList.of( @@ -394,6 +407,19 @@ public long getApplicationCount() throws IOException { return getApplicationSpecificationTable().count(ranges); } + /** + * Retrieves the latest version of the specified application. + * + *

This method retrieves the most recently created application version for the specified + * application reference. If no latest version is found, it falls back to treating the -SNAPSHOT + * version as the latest. If still not found, it sorts the versions by version ID, with the larger + * version ID string considered as the latest.

+ * + * @param appReference The reference to the application. + * @return The latest version of the specified application, or {@code null} if the application + * does not exist. + * @throws IOException If an error occurs while accessing the application metadata store. + */ @Nullable public ApplicationMeta getLatest(ApplicationReference appReference) throws IOException { Range range = getLatestApplicationRange(appReference); @@ -431,6 +457,15 @@ public ApplicationMeta getLatest(ApplicationReference appReference) throws IOExc return null; } + /** + * Retrieves a list of all application IDs corresponding to the specified application reference, + * including all versions of the application. + * + * @param appRef The reference to the application. + * @return A list of all application IDs corresponding to the specified application reference, + * including all versions of the application. + * @throws IOException If an error occurs while accessing the application metadata store. + */ public List getAllAppVersionsAppIds(ApplicationReference appRef) throws IOException { List appIds = new ArrayList<>(); @@ -455,16 +490,28 @@ public List getAllAppVersionsAppIds(ApplicationReference appRef) * @throws IOException if failed to read metadata */ public Map getApplicationsForAppIds( - Collection appIds) + Collection appIds, + FunctionWithException getSourceControlMeta) throws IOException { List>> multiKeys = appIds.stream() .map(this::getApplicationPrimaryKeys) .collect(Collectors.toList()); - return getApplicationSpecificationTable().multiRead(multiKeys) - .stream() - .collect(Collectors.toMap( - AppMetadataStore::getApplicationIdFromRow, - this::decodeRow)); + Map map = new HashMap<>(); + Collection rows = getApplicationSpecificationTable().multiRead(multiKeys); + for (StructuredRow row : rows) { + map.put(AppMetadataStore.getApplicationIdFromRow(row), + getSourceControlMetaAndDecodeRow(row, getSourceControlMeta)); + } + return map; + } + + private ApplicationMeta getSourceControlMetaAndDecodeRow(StructuredRow row, + FunctionWithException getSourceControlMeta) + throws IOException { + ApplicationId appId = AppMetadataStore.getApplicationIdFromRow(row); + ApplicationMeta meta = decodeRow(row); + SourceControlMeta sourceControlMeta = getSourceControlMeta.apply(appId); + return new ApplicationMeta(meta.getId(), meta.getSpec(), meta.getChange(), sourceControlMeta); } /** @@ -655,7 +702,8 @@ public int createLatestApplicationVersion(ApplicationId id, ApplicationMeta appM * @throws ConflictException if parent-version provided in the request doesn't match the * latest version, do not allow app to be created */ - public int createApplicationVersion(ApplicationId id, ApplicationMeta appMeta, boolean markAsLatest) + public int createApplicationVersion(ApplicationId id, + ApplicationMeta appMeta, boolean markAsLatest) throws IOException, ConflictException { String parentVersion = Optional.ofNullable(appMeta.getChange()) .map(ChangeDetail::getParentVersion).orElse(null); @@ -712,7 +760,7 @@ void writeApplication(String namespaceId, String appId, String versionId, writeApplicationSerialized(namespaceId, appId, versionId, GSON.toJson( new ApplicationMeta(appId, spec, null, null)), - change, sourceControlMeta, markAsLatest); + change, markAsLatest); updateApplicationEdit(namespaceId, appId); } @@ -774,6 +822,15 @@ public void deleteApplications(String namespaceId) getApplicationSpecificationTable().deleteAll(getNamespaceRange(namespaceId)); } + /** + * Updates the application specification for the specified application ID in the application + * metadata store. + * + * @param appId The ID of the application to update. + * @param spec The updated application specification. + * @throws IOException If failed to upsert + * @throws IllegalArgumentException If the application ID does not exist. + */ public void updateAppSpec(ApplicationId appId, ApplicationSpecification spec) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace("App spec to be updated: id: {}: spec: {}", appId, GSON.toJson(spec)); @@ -1168,6 +1225,18 @@ public RunRecordDetail recordProgramOrphaned(ProgramRunId programRunId, long end return meta; } + /** + * Records the rejected state for the specified program run in the run records. + * + * @param programRunId The ID of the program run to record the rejected state for. + * @param runtimeArgs The runtime arguments passed to the program run. + * @param systemArgs The system arguments passed to the program run. + * @param sourceId The source ID associated with the program run. + * @param artifactId The artifact ID associated with the program run, if any. + * @return The details of the recorded run record for the rejected program run, or {@code null} if + * the rejected state could not be recorded. + * @throws IOException If an error occurs while accessing the run records. + */ @Nullable public RunRecordDetail recordProgramRejected(ProgramRunId programRunId, Map runtimeArgs, Map systemArgs, @@ -1821,6 +1890,24 @@ public Map getRuns(@Nullable ProgramId programId, } } + /** + * Retrieves a filtered collection of {@link RunRecordDetail} objects associated with a program. + * The result set is limited in size and can include runs with different statuses, depending on + * the provided parameters. + * + * @param programReference The {@link ProgramReference} object identifying the program. + * @param status The {@link ProgramRunStatus} to filter the runs by. If + * {@code ProgramRunStatus.ALL} is provided, both active and completed + * runs might be included in the result. + * @param startTime The start timestamp (inclusive) to filter runs. + * @param endTime The end timestamp (inclusive) to filter runs. + * @param limit The maximum number of desired runs in the result set. + * @param filter An optional {@link Predicate} to apply additional filtering on the + * {@link RunRecordDetail} objects. + * @return A {@link Map} where keys are {@link ProgramRunId} and values are the corresponding + * {@link RunRecordDetail} objects. + * @throws IOException If an error occurs during data retrieval. + */ public Map getAllProgramRuns(ProgramReference programReference, ProgramRunStatus status, long startTime, long endTime, int limit, @@ -1886,6 +1973,14 @@ public Map getRuns(ApplicationId applicationId, // TODO: getRun is duplicated in cdap-watchdog AppMetadataStore class. // Any changes made here will have to be made over there too. // JIRA https://issues.cask.co/browse/CDAP-2172 + + /** + * Retrieves details of the program run identified by the specified program run ID. + * + * @param programRun The ID of the program run to retrieve details for. + * @return The details of the program run, or {@code null} if the program run does not exist. + * @throws IOException If an error occurs while accessing the run records. + */ @Nullable public RunRecordDetail getRun(ProgramRunId programRun) throws IOException { // Query active run record first @@ -1908,6 +2003,15 @@ public RunRecordDetail getRun(ProgramRunId programRun) throws IOException { // Fetching run record ignoring versions // We do this for places like LogHandler APIs that does not pass in a version + + /** + * Retrieves details of the program run identified by the specified program reference and run ID. + * + * @param programRef The reference to the program. + * @param runId The ID of the program run to retrieve details for. + * @return The details of the program run, or {@code null} if the program run does not exist. + * @throws IOException If an error occurs while accessing the run records. + */ @Nullable public RunRecordDetail getRun(ProgramReference programRef, String runId) throws IOException { // Query active run record first @@ -2215,6 +2319,14 @@ private long getInvertedTsScanKeyPart(long time) { return invertedTsKey < Long.MAX_VALUE ? invertedTsKey + 1 : invertedTsKey; } + /** + * Deletes the historical run records and counts associated with a specific application. + * + * @param namespaceId The namespace ID of the application. + * @param appId The application ID. + * @param versionId The version ID of the application. + * @throws IOException If an error occurs while deleting the historical data. + */ public void deleteProgramHistory(String namespaceId, String appId, String versionId) throws IOException { ApplicationId applicationId = new ApplicationId(namespaceId, appId, versionId); @@ -2230,6 +2342,12 @@ public void deleteProgramHistory(String namespaceId, String appId, String versio Range.singleton(getCountApplicationPrefix(TYPE_RUN_RECORD_UPGRADE_COUNT, applicationId))); } + /** + * Deletes the historical run records and counts associated with a specific application. + * + * @param applicationReference The reference to the application. + * @throws IOException If an error occurs while deleting the historical data. + */ public void deleteProgramHistory(ApplicationReference applicationReference) throws IOException { getRunRecordsTable() @@ -2244,6 +2362,13 @@ public void deleteProgramHistory(ApplicationReference applicationReference) Range.singleton(getCountApplicationRefPrefix(TYPE_RUN_RECORD_UPGRADE_COUNT, applicationReference))); } + /** + * Deletes the historical run records and counts associated with all applications within the + * specified namespace. + * + * @param namespaceId The ID of the namespace containing the applications. + * @throws IOException If an error occurs while deleting the historical data. + */ public void deleteProgramHistory(NamespaceId namespaceId) throws IOException { getRunRecordsTable().deleteAll( Range.singleton(getRunRecordNamespacePrefix(TYPE_RUN_RECORD_ACTIVE, namespaceId))); @@ -2274,6 +2399,15 @@ public void setWorkflowToken(ProgramRunId workflowRunId, WorkflowToken workflowT getWorkflowsTable().upsert(keys); } + /** + * Retrieves the workflow token associated with the specified workflow run. + * + * @param workflowId The {@link ProgramId} representing the workflow. + * @param workflowRunId The unique ID of the workflow run. + * @return The workflow token associated with the specified workflow run. + * @throws IOException If an error occurs while retrieving the workflow token. + * @throws IllegalArgumentException If the program ID does not represent a workflow. + */ public WorkflowToken getWorkflowToken(ProgramId workflowId, String workflowRunId) throws IOException { Preconditions.checkArgument(ProgramType.WORKFLOW == workflowId.getType()); @@ -2292,7 +2426,14 @@ public WorkflowToken getWorkflowToken(ProgramId workflowId, String workflowRunId } /** - * @return programs that were running between given start and end time and are completed + * Retrieves the set of program runs that were running between the specified start and end times + * and have completed. + * + * @param startTimeInSecs The start time (in seconds since the epoch) of the time range. + * @param endTimeInSecs The end time (in seconds since the epoch) of the time range. + * @return A {@link Set} of {@link RunId} objects representing the completed program runs within + * the specified time range. + * @throws IOException If an error occurs while retrieving the program runs. */ public Set getRunningInRangeCompleted(long startTimeInSecs, long endTimeInSecs) throws IOException { @@ -2304,7 +2445,14 @@ public Set getRunningInRangeCompleted(long startTimeInSecs, long endTimeI } /** - * @return programs that were running between given start and end time and are active + * Retrieves the set of program runs that were running between the specified start and end times + * and are active. + * + * @param startTimeInSecs The start time (in seconds since the epoch) of the time range. + * @param endTimeInSecs The end time (in seconds since the epoch) of the time range. + * @return A {@link Set} of {@link RunId} objects representing the active program runs within the + * specified time range. + * @throws IOException If an error occurs while retrieving the program runs. */ public Set getRunningInRangeActive(long startTimeInSecs, long endTimeInSecs) throws IOException { @@ -2393,7 +2541,7 @@ public Map getProgramTotalRunCounts( } /** - * Gets the id of the last fetched message that was set for a subscriber of the given TMS topic + * Gets the id of the last fetched message that was set for a subscriber of the given TMS topic. * * @param topic the topic to lookup the last message id * @param subscriber the subscriber name @@ -2451,6 +2599,12 @@ Set getRunningInRangeForStatus(String statusKey, long startTimeInSecs, return runIds; } + /** + * Deletes all metadata tables in the App Metadata Store. This method is intended for use only in + * testing environments and will delete all metadata store information. + * + * @throws IOException If an error occurs while deleting the metadata tables. + */ @VisibleForTesting // USE ONLY IN TESTS: WILL DELETE ALL METADATA STORE INFO public void deleteAllAppMetadataTables() throws IOException { @@ -2545,15 +2699,13 @@ private Range getLatestApplicationRange(ApplicationReference appReference) { } private void writeApplicationSerialized(String namespaceId, String appId, String versionId, - String serialized, @Nullable ChangeDetail change, - @Nullable SourceControlMeta sourceControlMeta) + String serialized, @Nullable ChangeDetail change) throws IOException { - writeApplicationSerialized(namespaceId, appId, versionId, serialized, change, sourceControlMeta, true); + writeApplicationSerialized(namespaceId, appId, versionId, serialized, change, true); } private void writeApplicationSerialized(String namespaceId, String appId, String versionId, - String serialized, @Nullable ChangeDetail change, - @Nullable SourceControlMeta sourceControlMeta, boolean markAsLatest) + String serialized, @Nullable ChangeDetail change, boolean markAsLatest) throws IOException { List> fields = getApplicationPrimaryKeys(namespaceId, appId, versionId); fields.add( @@ -2567,11 +2719,6 @@ private void writeApplicationSerialized(String namespaceId, String appId, String change.getDescription())); } fields.add(Fields.booleanField(StoreDefinition.AppMetadataStore.LATEST_FIELD, markAsLatest)); - - if (sourceControlMeta != null) { - fields.add(Fields.stringField(StoreDefinition.AppMetadataStore.SOURCE_CONTROL_META, - GSON.toJson(sourceControlMeta))); - } getApplicationSpecificationTable().upsert(fields); } @@ -2649,10 +2796,6 @@ private ApplicationMeta decodeRow(StructuredRow row) { ApplicationMeta meta = GSON.fromJson( row.getString(StoreDefinition.AppMetadataStore.APPLICATION_DATA_FIELD), ApplicationMeta.class); - SourceControlMeta sourceControl = GSON.fromJson( - row.getString(StoreDefinition.AppMetadataStore.SOURCE_CONTROL_META), - SourceControlMeta.class); - ApplicationSpecification spec = meta.getSpec(); String id = meta.getId(); ChangeDetail changeDetail; @@ -2662,7 +2805,7 @@ private ApplicationMeta decodeRow(StructuredRow row) { changeDetail = new ChangeDetail(changeSummary, null, author, creationTimeMillis, latest); } - return new ApplicationMeta(id, spec, changeDetail, sourceControl); + return new ApplicationMeta(id, spec, changeDetail); } private void writeToStructuredTableWithPrimaryKeys( @@ -2898,9 +3041,7 @@ private AppScanEntry(StructuredRow row) { this.changeDetail = new ChangeDetail(changeSummary, null, author, creationTimeMillis, latest); } - this.sourceControlMeta = GSON.fromJson( - row.getString(StoreDefinition.AppMetadataStore.SOURCE_CONTROL_META), - SourceControlMeta.class); + this.sourceControlMeta = null; } @Override diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java index a35340f26445..c031b74a7beb 100644 --- a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/DefaultStore.java @@ -42,6 +42,7 @@ import io.cdap.cdap.common.ConflictException; import io.cdap.cdap.common.NotFoundException; import io.cdap.cdap.common.ProgramNotFoundException; +import io.cdap.cdap.common.lang.FunctionWithException; import io.cdap.cdap.data2.dataset2.DatasetFramework; import io.cdap.cdap.internal.app.ForwardingApplicationSpecification; import io.cdap.cdap.internal.app.store.state.AppStateKey; @@ -143,6 +144,10 @@ private AppMetadataStore getAppMetadataStore(StructuredTableContext context) { return AppMetadataStore.create(context); } + private SourceControlMetadataStore getSourceControlMetadataStore(StructuredTableContext context) { + return SourceControlMetadataStore.create(context); + } + private WorkflowTable getWorkflowTable(StructuredTableContext context) throws TableNotFoundException { return new WorkflowTable(context.getTable(StoreDefinition.WorkflowStore.WORKFLOW_STATISTICS)); @@ -596,6 +601,7 @@ public void markApplicationsLatest(Collection appIds) @Override public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { + getSourceControlMetadataStore(context).write(id, meta.getSourceControlMeta()); return getAppMetadataStore(context).createLatestApplicationVersion(id, meta); }, ConflictException.class); } @@ -603,20 +609,22 @@ public int addLatestApplication(ApplicationId id, ApplicationMeta meta) throws C @Override public int addApplication(ApplicationId id, ApplicationMeta meta, boolean isLatest) throws ConflictException { return TransactionRunners.run(transactionRunner, context -> { + getSourceControlMetadataStore(context).write(id, meta.getSourceControlMeta()); return getAppMetadataStore(context).createApplicationVersion(id, meta, isLatest); }, ConflictException.class); } @Override - public void updateApplicationSourceControlMeta(Map updateRequests) + public void updateApplicationSourceControlMeta( + Map updateRequests) throws IOException { TransactionRunners.run(transactionRunner, context -> { - AppMetadataStore mds = getAppMetadataStore(context); + SourceControlMetadataStore sourceControlMetadataStore = getSourceControlMetadataStore(context); + AppMetadataStore appMetadataStore = getAppMetadataStore(context); for (Map.Entry updateRequest : updateRequests.entrySet()) { - try { - mds.updateAppScmMeta(updateRequest.getKey(), updateRequest.getValue()); - } catch (ApplicationNotFoundException e) { - // ignore this exception and continue updating the other applications + ApplicationId appId = updateRequest.getKey(); + if (appMetadataStore.getApplication(appId) != null) { + sourceControlMetadataStore.write(appId, updateRequest.getValue()); } } }, IOException.class); @@ -737,6 +745,9 @@ public void removeApplication(ApplicationReference appRef) { AppMetadataStore metaStore = getAppMetadataStore(context); metaStore.deleteApplication(appRef); metaStore.deleteProgramHistory(appRef); + getSourceControlMetadataStore(context).delete( + new ApplicationId(appRef.getNamespace(), + appRef.getApplication())); }); } @@ -750,6 +761,7 @@ public void removeApplication(ApplicationId id) { AppMetadataStore metaStore = getAppMetadataStore(context); metaStore.deleteApplication(id.getNamespace(), id.getApplication(), id.getVersion()); metaStore.deleteProgramHistory(id.getNamespace(), id.getApplication(), id.getVersion()); + getSourceControlMetadataStore(context).delete(id); }); } @@ -762,6 +774,8 @@ public void removeAll(NamespaceId id) { AppMetadataStore metaStore = getAppMetadataStore(context); metaStore.deleteApplications(id.getNamespace()); metaStore.deleteProgramHistory(id); + getSourceControlMetadataStore(context).deleteAll( + id.getNamespace()); }); } @@ -782,7 +796,13 @@ public Map getRuntimeArguments(ProgramRunId programRunId) { @Override public ApplicationMeta getApplicationMetadata(ApplicationId id) { return TransactionRunners.run(transactionRunner, context -> { - return getApplicationMeta(getAppMetadataStore(context), id); + ApplicationMeta meta = getApplicationMeta(getAppMetadataStore(context), id); + if (meta == null) { + return null; + } + SourceControlMeta sourceControlMeta = getSourceControlMetadataStore( + context).get(id); + return new ApplicationMeta(meta.getId(), meta.getSpec(), meta.getChange(), sourceControlMeta); }); } @@ -897,14 +917,19 @@ private boolean scanApplicationsWithReorder(ScanApplicationsRequest request, @Override public Map getApplications(Collection ids) { return TransactionRunners.run(transactionRunner, context -> { - return getAppMetadataStore(context).getApplicationsForAppIds(ids); + FunctionWithException sourceControlRetriever + = appId -> getSourceControlMetadataStore( + context).get(appId); + return getAppMetadataStore( + context).getApplicationsForAppIds(ids, sourceControlRetriever); }); } @Override public void setAppSourceControlMeta(ApplicationId appId, SourceControlMeta sourceControlMeta) { TransactionRunners.run(transactionRunner, context -> { - getAppMetadataStore(context).setAppSourceControlMeta(appId, sourceControlMeta); + getSourceControlMetadataStore(context).write(appId, + sourceControlMeta); }); } @@ -912,7 +937,9 @@ public void setAppSourceControlMeta(ApplicationId appId, SourceControlMeta sourc @Nullable public SourceControlMeta getAppSourceControlMeta(ApplicationReference appRef) { return TransactionRunners.run(transactionRunner, context -> { - return getAppMetadataStore(context).getAppSourceControlMeta(appRef); + return getSourceControlMetadataStore(context) + .get(new ApplicationId(appRef.getNamespace(), + appRef.getApplication())); }); } @@ -928,7 +955,14 @@ public Map getPrograms(Collection @Nullable public ApplicationMeta getLatest(ApplicationReference appRef) { return TransactionRunners.run(transactionRunner, context -> { - return getAppMetadataStore(context).getLatest(appRef); + ApplicationMeta meta = getAppMetadataStore(context).getLatest(appRef); + if (meta == null) { + return meta; + } + SourceControlMeta sourceControlMeta = getSourceControlMetadataStore(context) + .get(new ApplicationId(appRef.getNamespace(), + appRef.getApplication())); + return new ApplicationMeta(meta.getId(), meta.getSpec(), meta.getChange(), sourceControlMeta); }); } diff --git a/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStore.java b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStore.java new file mode 100644 index 000000000000..3ee117996bdd --- /dev/null +++ b/cdap-app-fabric/src/main/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStore.java @@ -0,0 +1,207 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.store; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.spi.data.StructuredRow; +import io.cdap.cdap.spi.data.StructuredTable; +import io.cdap.cdap.spi.data.StructuredTableContext; +import io.cdap.cdap.spi.data.TableNotFoundException; +import io.cdap.cdap.spi.data.table.field.Field; +import io.cdap.cdap.spi.data.table.field.Fields; +import io.cdap.cdap.spi.data.table.field.Range; +import io.cdap.cdap.store.StoreDefinition; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import javax.annotation.Nullable; + +/** + * Store for namespace and repository source control metadata. + */ +public class SourceControlMetadataStore { + + private StructuredTable namespaceSourceControlMetadataTable; + private final StructuredTableContext context; + + public static SourceControlMetadataStore create(StructuredTableContext context) { + return new SourceControlMetadataStore(context); + } + + private SourceControlMetadataStore(StructuredTableContext context) { + this.context = context; + } + + private StructuredTable getNamespaceSourceControlMetadataTable() { + try { + if (namespaceSourceControlMetadataTable == null) { + namespaceSourceControlMetadataTable = context.getTable( + StoreDefinition.NamespaceSourceControlMetadataStore.NAMESPACE_SOURCE_CONTROL_METADATA); + } + } catch (TableNotFoundException e) { + throw new RuntimeException(e); + } + return namespaceSourceControlMetadataTable; + } + + /** + * Retrieves the source control metadata for the specified application ID in the namespace from + * {@code NamespaceSourceControlMetadata} table. + * + * @param appId {@link ApplicationId} for which the source control metadata is being retrieved. + * @return The {@link SourceControlMeta} associated with the application ID, or {@code null} if no + * metadata is found. + * @throws IOException If it fails to read the metadata. + */ + @Nullable + public SourceControlMeta get(ApplicationId appId) throws IOException { + List> primaryKey = getPrimaryKey(appId); + StructuredTable table = getNamespaceSourceControlMetadataTable(); + Optional row = table.read(primaryKey); + + return row.map(nonNullRow -> { + String specificationHash = nonNullRow.getString( + StoreDefinition.NamespaceSourceControlMetadataStore.SPECIFICATION_HASH_FIELD); + String commitId = nonNullRow.getString( + StoreDefinition.NamespaceSourceControlMetadataStore.COMMIT_ID_FIELD); + Long lastSynced = nonNullRow.getLong( + StoreDefinition.NamespaceSourceControlMetadataStore.LAST_MODIFIED_FIELD); + if (specificationHash == null && commitId == null && lastSynced == 0L) { + return null; + } + return new SourceControlMeta(specificationHash, commitId, Instant.ofEpochMilli(lastSynced)); + }).orElse(null); + } + + + /** + * Sets the source control metadata for the specified application ID in the namespace. Source + * control metadata will be null when the application is deployed in the namespace. It will be + * non-null when the application is pulled from the remote repository and deployed in the + * namespace. + * + * @param appId {@link ApplicationId} for which the source control metadata is being + * set. + * @param sourceControlMeta The {@link SourceControlMeta} to be set. Can be {@code null} if + * application is just deployed. + * @throws IOException If failed to write the data. + */ + public void write(ApplicationId appId, + @Nullable SourceControlMeta sourceControlMeta) + throws IOException { + // In the Namespace Pipelines page, the sync status (SYNCED or UNSYNCED) + // and last modified of all the applications deployed in the namespace needs to be shown. + // If source control information is not added when the app is deployed, the data will + // split into two tables. + // JOIN operation is not currently supported yet. The filtering (eg, + // filter on UNSYNCED sync status) , sorting, searching , pagination becomes difficult. + // Instead of doing filtering, searching, sorting in memory, it will happen at + // database level. + StructuredTable scmTable = getNamespaceSourceControlMetadataTable(); + scmTable.upsert(getNamespaceSourceControlMetaFields(appId, sourceControlMeta)); + } + + /** + * Deletes the source control metadata associated with the specified application ID from the + * namespace. + * + * @param appId {@link ApplicationId} whose source control metadata is to be deleted. + * @throws IOException if it failed to read or delete the metadata + */ + public void delete(ApplicationId appId) throws IOException { + getNamespaceSourceControlMetadataTable().delete(getPrimaryKey(appId)); + } + + /** + * Deletes all rows of source control metadata within the specified namespace. + * + * @param namespace The namespace for which all source control metadata rows are to be deleted. + * @throws IOException if it failed to read or delete the metadata. + */ + public void deleteAll(String namespace) throws IOException { + getNamespaceSourceControlMetadataTable().deleteAll(getNamespaceRange(namespace)); + } + + private Collection> getNamespaceSourceControlMetaFields(ApplicationId appId, + SourceControlMeta scmMeta) throws IOException { + List> fields = getPrimaryKey(appId); + fields.add(Fields.stringField( + StoreDefinition.NamespaceSourceControlMetadataStore.SPECIFICATION_HASH_FIELD, + scmMeta == null ? "" : scmMeta.getFileHash())); + fields.add( + Fields.stringField(StoreDefinition.NamespaceSourceControlMetadataStore.COMMIT_ID_FIELD, + scmMeta == null ? "" : scmMeta.getCommitId())); + // Whenever an app is deployed, the expected behavior is that the last modified field will be + // retained and not reset. + Long lastModified = 0L; + if (scmMeta != null) { + lastModified = scmMeta.getLastSyncedAt().toEpochMilli(); + } else { + SourceControlMeta sourceControlMeta = get(appId); + if (sourceControlMeta != null) { + lastModified = sourceControlMeta.getLastSyncedAt().toEpochMilli(); + } + } + fields.add( + Fields.longField(StoreDefinition.NamespaceSourceControlMetadataStore.LAST_MODIFIED_FIELD, + lastModified)); + fields.add( + Fields.booleanField(StoreDefinition.NamespaceSourceControlMetadataStore.IS_SYNCED_FIELD, + scmMeta == null ? false : true)); + return fields; + } + + private List> getPrimaryKey(ApplicationId appId) { + List> primaryKey = new ArrayList<>(); + primaryKey.add( + Fields.stringField(StoreDefinition.NamespaceSourceControlMetadataStore.NAMESPACE_FIELD, + appId.getNamespace())); + primaryKey.add( + Fields.stringField(StoreDefinition.NamespaceSourceControlMetadataStore.TYPE_FIELD, + appId.getEntityType().toString())); + primaryKey.add( + Fields.stringField(StoreDefinition.NamespaceSourceControlMetadataStore.NAME_FIELD, + appId.getEntityName())); + return primaryKey; + } + + private Range getNamespaceRange(String namespaceId) { + return Range.singleton( + ImmutableList.of( + Fields.stringField(StoreDefinition.AppMetadataStore.NAMESPACE_FIELD, namespaceId))); + } + + /** + * Deletes all rows from the namespace source control metadata table. Only to be used in testing. + * + * @throws IOException If an I/O error occurs while deleting the metadata. + */ + @VisibleForTesting + void deleteNamespaceSourceControlMetadataTable() throws IOException { + getNamespaceSourceControlMetadataTable().deleteAll( + Range.from(ImmutableList.of( + Fields.stringField( + StoreDefinition.NamespaceSourceControlMetadataStore.NAMESPACE_FIELD, "")), + Range.Bound.INCLUSIVE)); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleServiceTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleServiceTest.java index c411940e7230..4eb9abf44432 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleServiceTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/services/ApplicationLifecycleServiceTest.java @@ -710,7 +710,12 @@ public void testUpdateSourceControlMeta() throws Exception { // deploy an app, then update its scm meta deploy(AllProgramsApp.class, 200, Constants.Gateway.API_VERSION_3_TOKEN, TEST_NAMESPACE1); ApplicationDetail applicationDetail = getAppDetails(TEST_NAMESPACE1, AllProgramsApp.NAME); - Assert.assertNull(applicationDetail.getSourceControlMeta()); + // Changed the assertions because source control metadata was updated with a non-null value in + // test testUpdateSourceControlMetaWithDuplicateAppIds for a specific appId. In this test, + // we have deployed this appId, which makes file hash and commitId null but last synced value is retained + Assert.assertNull(applicationDetail.getSourceControlMeta().getFileHash()); + Assert.assertNull(applicationDetail.getSourceControlMeta().getCommitId()); + Assert.assertNotNull(applicationDetail.getSourceControlMeta().getLastSyncedAt()); applicationLifecycleService.updateSourceControlMeta( new NamespaceId(TEST_NAMESPACE1), @@ -733,7 +738,6 @@ public void testUpdateSourceControlMetaWithNonExistingApp() throws Exception { // deploy an app, then update its scm meta deploy(AllProgramsApp.class, 200, Constants.Gateway.API_VERSION_3_TOKEN, TEST_NAMESPACE1); ApplicationDetail applicationDetail = getAppDetails(TEST_NAMESPACE1, AllProgramsApp.NAME); - Assert.assertNull(applicationDetail.getSourceControlMeta()); applicationLifecycleService.updateSourceControlMeta( new NamespaceId(TEST_NAMESPACE1), diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java index 7fe5232ee566..e4e5c7f04453 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/AppMetadataStoreTest.java @@ -26,6 +26,7 @@ import io.cdap.cdap.api.artifact.ArtifactId; import io.cdap.cdap.app.store.ScanApplicationsRequest; import io.cdap.cdap.common.app.RunIds; +import io.cdap.cdap.common.lang.FunctionWithException; import io.cdap.cdap.common.utils.ProjectInfo; import io.cdap.cdap.internal.AppFabricTestHelper; import io.cdap.cdap.internal.app.ApplicationSpecificationAdapter; @@ -42,6 +43,7 @@ import io.cdap.cdap.proto.id.ProgramId; import io.cdap.cdap.proto.id.ProgramReference; import io.cdap.cdap.proto.id.ProgramRunId; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; import io.cdap.cdap.spi.data.SortOrder; import io.cdap.cdap.spi.data.StructuredTable; import io.cdap.cdap.spi.data.table.field.Field; @@ -963,9 +965,11 @@ public void testBatchApplications() { for (int i = 0; i < 30; i++) { appIds.add(NamespaceId.DEFAULT.app("test" + i)); } + FunctionWithException sourceControlRetriever + = appId -> new SourceControlMeta("fileHash", "commitId", Instant.now()); Map result = TransactionRunners.run(transactionRunner, context -> { AppMetadataStore store = AppMetadataStore.create(context); - return store.getApplicationsForAppIds(appIds); + return store.getApplicationsForAppIds(appIds, sourceControlRetriever); }); Assert.assertEquals(20, result.size()); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/NoSqlSourceControlMetadataTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/NoSqlSourceControlMetadataTest.java new file mode 100644 index 000000000000..78588b041e90 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/NoSqlSourceControlMetadataTest.java @@ -0,0 +1,39 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.store; + +import com.google.inject.Injector; +import io.cdap.cdap.internal.AppFabricTestHelper; +import io.cdap.cdap.proto.id.NamespaceId; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +public class NoSqlSourceControlMetadataTest extends SourceControlMetadataStoreTest { + + @BeforeClass + public static void beforeClass() throws Exception { + Injector injector = AppFabricTestHelper.getInjector(); + AppFabricTestHelper.ensureNamespaceExists(NamespaceId.DEFAULT); + transactionRunner = injector.getInstance(TransactionRunner.class); + } + + @AfterClass + public static void tearDown() { + AppFabricTestHelper.shutdown(); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStoreTest.java new file mode 100644 index 000000000000..996a39590783 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SourceControlMetadataStoreTest.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.store; + +import static org.junit.Assert.assertEquals; + +import io.cdap.cdap.proto.id.ApplicationId; +import io.cdap.cdap.proto.sourcecontrol.SourceControlMeta; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.spi.data.transaction.TransactionRunners; +import java.time.Instant; +import org.junit.Before; +import org.junit.Test; + +public abstract class SourceControlMetadataStoreTest { + + private static final String NAMESPACE = "testNamespace"; + private static final String COMMIT_ID = "testCommitId"; + private static final String SPEC_HASH = "testSpecHash"; + private static final Instant LAST_MODIFIED = Instant.now(); + private static final String NAME = "testName"; + private static final ApplicationId APP_ID = new ApplicationId(NAMESPACE, NAME); + private static final SourceControlMeta SOURCE_CONTROL_META = new SourceControlMeta(SPEC_HASH, + COMMIT_ID, LAST_MODIFIED); + + protected static TransactionRunner transactionRunner; + + @Before + public void before() { + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + store.deleteNamespaceSourceControlMetadataTable(); + }); + + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + store.write(APP_ID, SOURCE_CONTROL_META); + }); + } + + @Test + public void testSetNamespaceSourceControlMeta() throws Exception { + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + ApplicationId appId = new ApplicationId(NAMESPACE, "test2"); + store.write(appId, SOURCE_CONTROL_META); + SourceControlMeta sourceControlMeta = store.get(appId); + assertEquals(SOURCE_CONTROL_META, sourceControlMeta); + }); + } + + @Test + public void testGetNamespaceSourceControlMeta() throws Exception { + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + SourceControlMeta scmMeta = store.get(APP_ID); + assertEquals(SOURCE_CONTROL_META, scmMeta); + }); + } + + @Test + public void testUpdateNamespaceSourceControlMeta() throws Exception { + SourceControlMeta newSourceControlMeta = new SourceControlMeta("newFileHash", "newCommitId", + Instant.now()); + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + store.write(APP_ID, newSourceControlMeta); + SourceControlMeta scmMeta = store.get(APP_ID); + assertEquals(newSourceControlMeta, scmMeta); + }); + } + + @Test + public void testDeleteNamespaceSourceControlMetadata() throws Exception { + TransactionRunners.run(transactionRunner, context -> { + SourceControlMetadataStore store = SourceControlMetadataStore.create(context); + store.delete(APP_ID); + SourceControlMeta scmMeta = store.get(APP_ID); + assertEquals(null, scmMeta); + }); + } +} diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlAppMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlAppMetadataStoreTest.java index 8593b7ba59ad..9b18e0219ec4 100644 --- a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlAppMetadataStoreTest.java +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlAppMetadataStoreTest.java @@ -61,7 +61,8 @@ public static void beforeClass() throws IOException, TableAlreadyExistsException new AbstractModule() { @Override protected void configure() { - bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class).in(Scopes.SINGLETON); + bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); } } ); diff --git a/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlSourceControlMetadataStoreTest.java b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlSourceControlMetadataStoreTest.java new file mode 100644 index 000000000000..9fce1ab516a4 --- /dev/null +++ b/cdap-app-fabric/src/test/java/io/cdap/cdap/internal/app/store/SqlSourceControlMetadataStoreTest.java @@ -0,0 +1,76 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.internal.app.store; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Scopes; +import io.cdap.cdap.api.metrics.MetricsCollectionService; +import io.cdap.cdap.common.conf.CConfiguration; +import io.cdap.cdap.common.guice.ConfigModule; +import io.cdap.cdap.common.guice.LocalLocationModule; +import io.cdap.cdap.common.metrics.NoOpMetricsCollectionService; +import io.cdap.cdap.data.runtime.StorageModule; +import io.cdap.cdap.data.runtime.SystemDatasetRuntimeModule; +import io.cdap.cdap.spi.data.StructuredTableAdmin; +import io.cdap.cdap.spi.data.TableAlreadyExistsException; +import io.cdap.cdap.spi.data.sql.PostgresInstantiator; +import io.cdap.cdap.spi.data.transaction.TransactionRunner; +import io.cdap.cdap.store.StoreDefinition; +import io.zonky.test.db.postgres.embedded.EmbeddedPostgres; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.rules.TemporaryFolder; + +public class SqlSourceControlMetadataStoreTest extends SourceControlMetadataStoreTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + private static EmbeddedPostgres pg; + + @BeforeClass + public static void beforeClass() throws IOException, TableAlreadyExistsException { + CConfiguration cConf = CConfiguration.create(); + pg = PostgresInstantiator.createAndStart(cConf, TEMP_FOLDER.newFolder()); + Injector injector = Guice.createInjector( + new ConfigModule(cConf), + new LocalLocationModule(), + new SystemDatasetRuntimeModule().getInMemoryModules(), + new StorageModule(), + new AbstractModule() { + @Override + protected void configure() { + bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class) + .in(Scopes.SINGLETON); + } + } + ); + + transactionRunner = injector.getInstance(TransactionRunner.class); + StoreDefinition.NamespaceSourceControlMetadataStore.create( + injector.getInstance(StructuredTableAdmin.class)); + } + + @AfterClass + public static void afterClass() throws IOException { + pg.close(); + } +} diff --git a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java index 2f86499b25d0..c94e833fe14e 100644 --- a/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java +++ b/cdap-data-fabric/src/main/java/io/cdap/cdap/store/StoreDefinition.java @@ -72,6 +72,8 @@ public static void createAllTables(StructuredTableAdmin tableAdmin) throws IOExc AppStateStore.create(tableAdmin); CredentialProviderStore.create(tableAdmin); OperationRunsStore.create(tableAdmin); + NamespaceSourceControlMetadataStore.create(tableAdmin); + RepositorySourceControlMetadataStore.create(tableAdmin); } /** @@ -179,6 +181,74 @@ public static void create(StructuredTableAdmin tableAdmin) throws IOException { } } + /** + * Schema for NamespaceSourceControlMetadata table. This table stores the source control metadata, + * i.e its file hash, commitID, last modified and sync status, of an entity within a namespace + */ + public static final class NamespaceSourceControlMetadataStore { + + public static final StructuredTableId NAMESPACE_SOURCE_CONTROL_METADATA = + new StructuredTableId("namespace_source_control_metadata"); + + public static final String NAMESPACE_FIELD = "namespace"; + public static final String TYPE_FIELD = "type"; + public static final String NAME_FIELD = "name"; + public static final String SPECIFICATION_HASH_FIELD = "specification_hash"; + public static final String COMMIT_ID_FIELD = "commit_id"; + public static final String LAST_MODIFIED_FIELD = "last_modified"; + public static final String IS_SYNCED_FIELD = "is_synced"; + + public static final StructuredTableSpecification NAMESPACE_SOURCE_CONTROL_METADATA_TABLE_SPEC = + new StructuredTableSpecification.Builder() + .withId(NAMESPACE_SOURCE_CONTROL_METADATA) + .withFields(Fields.stringType(NAMESPACE_FIELD), + Fields.stringType(TYPE_FIELD), + Fields.stringType(NAME_FIELD), + Fields.stringType(SPECIFICATION_HASH_FIELD), + Fields.stringType(COMMIT_ID_FIELD), + Fields.longType(LAST_MODIFIED_FIELD), + Fields.booleanType(IS_SYNCED_FIELD)) + .withPrimaryKeys(NAMESPACE_FIELD, TYPE_FIELD, NAME_FIELD) + .withIndexes(IS_SYNCED_FIELD, LAST_MODIFIED_FIELD) + .build(); + + public static void create(StructuredTableAdmin tableAdmin) throws IOException { + createIfNotExists(tableAdmin, NAMESPACE_SOURCE_CONTROL_METADATA_TABLE_SPEC); + } + } + + /** + * Schema for RespositorySourceControlMetadata table. This table stores the source control metadata, + * i.e its last modified and sync status, of an entity within the provided repository + */ + public static final class RepositorySourceControlMetadataStore { + + public static final StructuredTableId REPOSITORY_SOURCE_CONTROL_METADATA = + new StructuredTableId("repository_source_control_metadata"); + + public static final String NAMESPACE_FIELD = "namespace"; + public static final String TYPE_FIELD = "type"; + public static final String NAME_FIELD = "name"; + public static final String LAST_MODIFIED_FIELD = "last_modified"; + public static final String IS_SYNCED_FIELD = "is_synced"; + + public static final StructuredTableSpecification REPOSITORY_SOURCE_CONTROL_METADATA_TABLE_SPEC = + new StructuredTableSpecification.Builder() + .withId(REPOSITORY_SOURCE_CONTROL_METADATA) + .withFields(Fields.stringType(NAMESPACE_FIELD), + Fields.stringType(TYPE_FIELD), + Fields.stringType(NAME_FIELD), + Fields.longType(LAST_MODIFIED_FIELD), + Fields.booleanType(IS_SYNCED_FIELD)) + .withPrimaryKeys(NAMESPACE_FIELD, TYPE_FIELD, NAME_FIELD) + .withIndexes(IS_SYNCED_FIELD, LAST_MODIFIED_FIELD) + .build(); + + public static void create(StructuredTableAdmin tableAdmin) throws IOException { + createIfNotExists(tableAdmin, REPOSITORY_SOURCE_CONTROL_METADATA_TABLE_SPEC); + } + } + /** * Schema for workflow table. */ @@ -1351,7 +1421,7 @@ public static final class OperationRunsStore { Fields.longType(START_TIME_FIELD), Fields.longType(UPDATE_TIME_FIELD), Fields.stringType(DETAILS_FIELD) - ) + ) .withPrimaryKeys(NAMESPACE_FIELD, ID_FIELD) .withIndexes(TYPE_FIELD, STATUS_FIELD, START_TIME_FIELD) .build(); diff --git a/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/SortBy.java b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/SortBy.java new file mode 100644 index 000000000000..3c710b25d213 --- /dev/null +++ b/cdap-proto/src/main/java/io/cdap/cdap/proto/sourcecontrol/SortBy.java @@ -0,0 +1,24 @@ +/* + * Copyright © 2024 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.proto.sourcecontrol; + +/** + * Sort order options for namespace and repository pipelines. + */ +public enum SortBy { + PIPELINE_NAME, LAST_SYNCED_DATE +}