Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CDAP-20993] Periodic refresh implementation of source control metadata for SCM sync status #15626

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import io.cdap.cdap.internal.app.services.RunRecordCorrectorService;
import io.cdap.cdap.internal.app.services.RunRecordMonitorService;
import io.cdap.cdap.internal.app.services.ScheduledRunRecordCorrectorService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.DefaultStore;
import io.cdap.cdap.internal.bootstrap.guice.BootstrapModules;
import io.cdap.cdap.internal.capability.CapabilityModule;
Expand Down Expand Up @@ -214,6 +215,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -258,6 +260,7 @@ protected void configure() {
bind(MRJobInfoFetcher.class).to(LocalMRJobInfoFetcher.class);
bind(StorageProviderNamespaceAdmin.class).to(LocalStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

Multibinder<String> servicesNamesBinder =
Multibinder.newSetBinder(binder(), String.class,
Expand Down Expand Up @@ -315,6 +318,7 @@ protected void configure() {
bind(StorageProviderNamespaceAdmin.class)
.to(DistributedStorageProviderNamespaceAdmin.class);
bind(UGIProvider.class).toProvider(UgiProviderProvider.class);
bind(SourceControlMetadataRefreshService.class).in(Scopes.SINGLETON);

bind(ProgramRunDispatcher.class).to(RemoteProgramRunDispatcher.class)
.in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import io.cdap.cdap.security.impersonation.UGIProvider;
import io.cdap.cdap.security.spi.authorization.AccessEnforcer;
import io.cdap.cdap.security.spi.authorization.ContextAccessEnforcer;
import io.cdap.cdap.sourcecontrol.guice.SourceControlModule;
import io.cdap.cdap.store.DefaultOwnerStore;
import org.apache.twill.filesystem.LocationFactory;

Expand Down Expand Up @@ -185,6 +186,10 @@ protected void configure() {
bind(MetadataAdmin.class).to(DefaultMetadataAdmin.class);
expose(MetadataAdmin.class);

//TODO(adrika): cleanup this dependency later
// This is needed because there is a transitive dependency on source control operation
// runner which is not actually being used here
install(new SourceControlModule());
bindPreviewRunner(binder());
expose(PreviewRunner.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;
import java.util.List;

/**
* Represents a response containing a list of source control metadata records, next page token and
* last refresh time.
*/

public class ListSourceControlMetadataResponse {

private final List<SourceControlMetadataRecord> apps;
private final String nextPageToken;
private final Long lastRefreshTime;

/**
* Constructs a ListSourceControlMetadataResponse object.
*
* @param apps The list of source control metadata records.
* @param nextPageToken The token for fetching the next page of results.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public ListSourceControlMetadataResponse(List<SourceControlMetadataRecord> apps,
String nextPageToken, Long lastRefreshTime) {
this.apps = apps;
this.nextPageToken = nextPageToken;
this.lastRefreshTime = lastRefreshTime;
}

public List<SourceControlMetadataRecord> getApps() {
return apps;
}

public String getNextPageToken() {
return nextPageToken;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright © 2024 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.app.store;

import io.cdap.cdap.proto.SourceControlMetadataRecord;

/**
* Represents a response containing a single source control metadata record and last refresh time.
*/
public class SingleSourceControlMetadataResponse {

private final SourceControlMetadataRecord app;
private final Long lastRefreshTime;

/**
* Constructs a SingleSourceControlMetadataResponse object.
*
* @param app The source control metadata record.
* @param lastRefreshTime The timestamp of the last refresh operation.
*/
public SingleSourceControlMetadataResponse(SourceControlMetadataRecord app,
Long lastRefreshTime) {
this.app = app;
this.lastRefreshTime = lastRefreshTime;
}

public SourceControlMetadataRecord getApp() {
return app;
}

public Long getLastRefreshTime() {
return lastRefreshTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@
* @return a {@link Map} from the {@link ProgramId} to the list of run records; there will be no entry for programs
* that do not exist.
*/
Map<ProgramId, Collection<RunRecordDetail>> getActiveRuns(Collection<ProgramReference> programRefs);

Check warning on line 355 in cdap-app-fabric/src/main/java/io/cdap/cdap/app/store/Store.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.OverloadMethodsDeclarationOrderCheck

All overloaded methods should be placed next to each other. Placing non-overloaded methods in between overloaded methods with the same type is a violation. Previous overloaded method located at line '322'.

/**
* Fetches the active (i.e STARTING or RUNNING or SUSPENDED) run records for the
Expand Down Expand Up @@ -495,8 +495,6 @@
int scanRepositorySourceControlMetadata(ScanSourceControlMetadataRequest request,
Consumer<SourceControlMetadataRecord> consumer);

void updateSourceControlMeta(ApplicationReference appRef, String repoFileHash);

/**
* Returns a Map of {@link ApplicationMeta} for the given set of {@link ApplicationId}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import io.cdap.cdap.api.security.AccessException;
import io.cdap.cdap.app.runtime.ProgramRuntimeService;
import io.cdap.cdap.app.store.ApplicationFilter;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanApplicationsRequest;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.app.store.SingleSourceControlMetadataResponse;
import io.cdap.cdap.common.ApplicationNotFoundException;
import io.cdap.cdap.common.ArtifactAlreadyExistsException;
import io.cdap.cdap.common.BadRequestException;
Expand Down Expand Up @@ -129,7 +131,6 @@
* Key in json paginated applications list response.
*/
public static final String APP_LIST_PAGINATED_KEY = "applications";
public static final String APP_LIST_PAGINATED_KEY_SHORT = "apps";

/**
* Runtime program service for running and managing programs.
Expand Down Expand Up @@ -319,27 +320,29 @@
@QueryParam("filter") String filter
) throws Exception {
validateNamespace(namespaceId);

JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
jsonListResponder.send(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null : record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = applicationLifecycleService.scanSourceControlMetadata(
scanRequest, batchSize,
scmMetaRecord -> {
apps.add(scmMetaRecord);
lastRecord.set(scmMetaRecord);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw exception and let the http exception handler to handle the response. If you send here, you need to return from the catch block, otherwise the flow will continue.

}
SourceControlMetadataRecord record = lastRecord.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lastRecord variable seems unnecessary. The same can be achieved by apps.isEmpty() and apps.get(apps.size() - 1).

String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would execute another SQL in another transaction, which may be inconsistent with what has been returned by the scanSourceControlMetadata call. I think it's better for the scan method to return a response that contains apps, page limit, and last scan time. This makes sure all the data are coming from the same transaction.

ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand All @@ -357,10 +360,11 @@
@PathParam("namespace-id") final String namespaceId,
@PathParam("app-id") final String appName) throws Exception {
validateApplicationId(namespaceId, appName);

responder.sendJson(HttpResponseStatus.OK,
GSON.toJson(applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName))));
SourceControlMetadataRecord app = applicationLifecycleService.getSourceControlMetadataRecord(
new ApplicationReference(namespaceId, appName));
Long lastRefreshTime = applicationLifecycleService.getLastRefreshTime(namespaceId);
SingleSourceControlMetadataResponse response = new SingleSourceControlMetadataResponse(app, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

private ScanApplicationsRequest getScanRequest(String namespaceId, String artifactVersion,
Expand All @@ -375,7 +379,7 @@
}
if (nameFilter != null && !nameFilter.isEmpty()) {
if (nameFilterType != null) {
switch (nameFilterType) {

Check warning on line 382 in cdap-app-fabric/src/main/java/io/cdap/cdap/gateway/handlers/AppLifecycleHttpHandler.java

View workflow job for this annotation

GitHub Actions / Checkstyle

com.puppycrawl.tools.checkstyle.checks.coding.MissingSwitchDefaultCheck

switch without "default" clause.
case EQUALS:
builder.setApplicationReference(new ApplicationReference(namespaceId, nameFilter));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import io.cdap.cdap.api.feature.FeatureFlagsProvider;
import io.cdap.cdap.app.store.ListSourceControlMetadataResponse;
import io.cdap.cdap.app.store.ScanSourceControlMetadataRequest;
import io.cdap.cdap.common.BadRequestException;
import io.cdap.cdap.common.ForbiddenException;
import io.cdap.cdap.common.NotFoundException;
import io.cdap.cdap.common.conf.CConfiguration;
import io.cdap.cdap.common.conf.Constants;
import io.cdap.cdap.common.conf.Constants.AppFabric;
Expand Down Expand Up @@ -59,6 +59,8 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -156,29 +158,29 @@ public void listAllApplications(FullHttpRequest request, HttpResponder responder
@QueryParam("filter") String filter) throws Exception {
checkSourceControlFeatureFlag();
validateNamespaceId(namespaceId);
JsonPaginatedListResponder.respond(GSON, responder, APP_LIST_PAGINATED_KEY_SHORT,
jsonListResponder -> {
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
jsonListResponder.send(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
} catch (NotFoundException e) {
responder.sendString(HttpResponseStatus.NOT_FOUND, e.getMessage());
}
SourceControlMetadataRecord record = lastRecord.get();
return !pageLimitReached || record == null ? null :
record.getName();
});
List<SourceControlMetadataRecord> apps = new ArrayList<>();
AtomicReference<SourceControlMetadataRecord> lastRecord = new AtomicReference<>(null);
ScanSourceControlMetadataRequest scanRequest = SourceControlMetadataHelper.getScmStatusScanRequest(
namespaceId,
pageToken, pageSize, sortOrder, sortOn, filter);
boolean pageLimitReached = false;
try {
pageLimitReached = sourceControlService.scanRepoMetadata(
scanRequest, batchSize,
record -> {
apps.add(record);
lastRecord.set(record);
});
} catch (IOException e) {
responder.sendString(HttpResponseStatus.INTERNAL_SERVER_ERROR, e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. Please throw. The old code was wrong too, please don't follow with that pattern.

}
SourceControlMetadataRecord record = lastRecord.get();
String nextPageToken = !pageLimitReached || record == null ? null :
record.getName();
Long lastRefreshTime = sourceControlService.getLastRefreshTime(namespaceId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using primitive long instead.

ListSourceControlMetadataResponse response = new ListSourceControlMetadataResponse(apps,
samdgupi marked this conversation as resolved.
Show resolved Hide resolved
nextPageToken, lastRefreshTime);
responder.sendJson(HttpResponseStatus.OK, GSON.toJson(response));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.cdap.cdap.common.security.HttpsEnabler;
import io.cdap.cdap.features.Feature;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataMigrationService;
import io.cdap.cdap.internal.app.sourcecontrol.SourceControlMetadataRefreshService;
import io.cdap.cdap.internal.app.store.AppMetadataStore;
import io.cdap.cdap.internal.bootstrap.BootstrapService;
import io.cdap.cdap.internal.credential.CredentialProviderService;
Expand Down Expand Up @@ -97,6 +98,7 @@ public class AppFabricServer extends AbstractIdleService {
private final RepositoryCleanupService repositoryCleanupService;
private final OperationNotificationSubscriberService operationNotificationSubscriberService;
private final SourceControlMetadataMigrationService sourceControlMetadataMigrationService;
private final SourceControlMetadataRefreshService sourceControlMetadataRefreshService;
private final CConfiguration cConf;
private final SConfiguration sConf;
private final boolean sslEnabled;
Expand Down Expand Up @@ -137,7 +139,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService,
OperationNotificationSubscriberService operationNotificationSubscriberService,
SourceControlMetadataMigrationService sourceControlMetadataMigrationService) {
SourceControlMetadataMigrationService sourceControlMetadataMigrationService,
SourceControlMetadataRefreshService sourceControlMetadataRefreshService) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -167,6 +170,7 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.repositoryCleanupService = repositoryCleanupService;
this.operationNotificationSubscriberService = operationNotificationSubscriberService;
this.sourceControlMetadataMigrationService = sourceControlMetadataMigrationService;
this.sourceControlMetadataRefreshService = sourceControlMetadataRefreshService;
}

/**
Expand Down Expand Up @@ -199,7 +203,8 @@ protected void startUp() throws Exception {
sourceControlOperationRunner.start(),
repositoryCleanupService.start(),
operationNotificationSubscriberService.start(),
sourceControlMetadataMigrationService.start()
sourceControlMetadataMigrationService.start(),
sourceControlMetadataRefreshService.start()
));
Futures.allAsList(futuresList).get();

Expand Down Expand Up @@ -262,6 +267,7 @@ protected void shutDown() throws Exception {
namespaceCredentialProviderService.stopAndWait();
operationNotificationSubscriberService.stopAndWait();
sourceControlMetadataMigrationService.stopAndWait();
sourceControlMetadataRefreshService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Loading
Loading